Finish first pass update_node impl

This commit is contained in:
James Wilson
2021-06-30 12:14:17 +01:00
parent c5ca84ee9a
commit 770739c7c8
4 changed files with 379 additions and 85 deletions
+90 -50
View File
@@ -13,7 +13,7 @@ use std::{net::{IpAddr, Ipv4Addr}, str::FromStr};
use futures::channel::{ mpsc };
use futures::{ SinkExt, StreamExt };
use std::collections::{ HashMap, HashSet };
use crate::state::{ self, State, NodeId };
use crate::state::{ self, State, NodeId, OnUpdateNode };
use crate::feed_message::{ self, FeedMessageSerializer };
use crate::find_location;
@@ -201,7 +201,7 @@ impl InnerLoop {
));
let chain_genesis_hash = self.node_state
.get_node_chain(node_id)
.get_chain_by_node_id(node_id)
.map(|chain| *chain.genesis_hash());
if let Some(chain_genesis_hash) = chain_genesis_hash {
@@ -279,48 +279,14 @@ impl InnerLoop {
self.remove_nodes_and_broadcast_result(Some(node_id)).await;
},
FromShardWebsocket::Update { local_id, payload } => {
// TODO: Fill this all in...
let node_id = match self.node_ids.get_by_right(&(shard_conn_id, local_id)) {
Some(id) => id,
None => return
Some(id) => *id,
None => {
log::error!("Cannot find ID for node with shard/connectionId of {}/{}", shard_conn_id, local_id);
return
}
};
if let Some(block) = payload.best_block() {
}
match payload {
node::Payload::SystemInterval(system_interval) => {
},
node::Payload::AfgAuthoritySet(_) => {
},
node::Payload::AfgFinalized(_) => {
},
node::Payload::AfgReceivedPrecommit(_) => {
},
node::Payload::AfgReceivedPrevote(_) => {
},
// This message should have been handled before the payload made it this far:
node::Payload::SystemConnected(_) => {
unreachable!("SystemConnected message seen in Telemetry Core, but should have been handled in shard");
},
// The following messages aren't handled at the moment. List them explicitly so
// that we have to make an explicit choice for any new messages:
node::Payload::BlockImport(_) |
node::Payload::NotifyFinalized(_) |
node::Payload::AfgReceivedCommit(_) |
node::Payload::TxPoolImport |
node::Payload::AfgFinalizedBlocksUpTo |
node::Payload::AuraPreSealedBlock |
node::Payload::PreparedBlockForProposing => {},
}
// TODO: node_state.update_node, then handle returned diffs
self.handle_from_shard_update(node_id, payload).await;
},
FromShardWebsocket::Disconnected => {
// Find all nodes associated with this shard connection ID:
@@ -336,6 +302,67 @@ impl InnerLoop {
}
}
async fn handle_from_shard_update(&mut self, node_id: NodeId, payload: node::Payload) {
let mut feed_message_serializer = FeedMessageSerializer::new();
let mut broadcast_finality = false;
self.node_state.update_node(node_id, payload, |msg| {
match msg {
OnUpdateNode::StaleNode(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::BestBlock(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::BestFinalized(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::ImportedBlock(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::Hardware(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::NodeStatsUpdate(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::NodeIOUpdate(to_feed) => {
feed_message_serializer.push(to_feed);
}
OnUpdateNode::AfgFinalized(to_feed) => {
feed_message_serializer.push(to_feed);
// All messages sent in an update leading to this message are only
// broadcast to feeds subscribed to chain finality info
broadcast_finality = true;
}
OnUpdateNode::AfgReceivedPrecommit(to_feed) => {
feed_message_serializer.push(to_feed);
// All messages sent in an update leading to this message are only
// broadcast to feeds subscribed to chain finality info
broadcast_finality = true;
}
OnUpdateNode::AfgReceivedPrevote(to_feed) => {
feed_message_serializer.push(to_feed);
// All messages sent in an update leading to this message are only
// broadcast to feeds subscribed to chain finality info
broadcast_finality = true;
}
OnUpdateNode::FinalizedBlock(to_feed) => {
feed_message_serializer.push(to_feed);
}
}
});
if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) {
let genesis_hash = *chain.genesis_hash();
if broadcast_finality {
self.finalize_and_broadcast_to_chain_finality_feeds(&genesis_hash, feed_message_serializer).await;
} else {
self.finalize_and_broadcast_to_chain_feeds(&genesis_hash, feed_message_serializer).await;
}
}
}
/// Handle messages coming from feeds.
async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) {
log::debug!("Message from feed ({}): {:?}", feed_conn_id, msg);
@@ -465,7 +492,7 @@ impl InnerLoop {
// Group by chain to simplify the handling of feed messages:
let mut node_ids_per_chain: HashMap<BlockHash,Vec<NodeId>> = HashMap::new();
for node_id in node_ids.into_iter() {
if let Some(chain) = self.node_state.get_node_chain(node_id) {
if let Some(chain) = self.node_state.get_chain_by_node_id(node_id) {
node_ids_per_chain.entry(*chain.genesis_hash()).or_default().push(node_id);
}
}
@@ -537,13 +564,8 @@ impl InnerLoop {
/// Send a message to all chain feeds.
async fn broadcast_to_chain_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) {
println!("BROADCAST TO CHAIN FEEDS, {}, \n\n{:?}\n\n{:?}\n\n", genesis_hash, self.chain_to_feed_conn_ids, self.feed_conn_id_to_chain);
if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) {
for &feed_id in feeds {
// How much faster would it be if we processed these in parallel?
// Is it practical to do so given lifetimes and such?
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
let _ = chan.send(message.clone()).await;
}
@@ -561,9 +583,27 @@ println!("BROADCAST TO CHAIN FEEDS, {}, \n\n{:?}\n\n{:?}\n\n", genesis_hash, sel
/// Send a message to everybody.
async fn broadcast_to_all_feeds(&mut self, message: ToFeedWebsocket) {
for chan in self.feed_channels.values_mut() {
// How much faster would it be if we processed these in parallel?
// Is it practical to do so given lifetimes and such?
let _ = chan.send(message.clone()).await;
}
}
/// Finalize a [`FeedMessageSerializer`] and broadcast the result to chain finality feeds
async fn finalize_and_broadcast_to_chain_finality_feeds(&mut self, genesis_hash: &BlockHash, serializer: FeedMessageSerializer) {
if let Some(bytes) = serializer.into_finalized() {
self.broadcast_to_chain_finality_feeds(genesis_hash, ToFeedWebsocket::Bytes(bytes)).await;
}
}
/// Send a message to all chain finality feeds.
async fn broadcast_to_chain_finality_feeds(&mut self, genesis_hash: &BlockHash, message: ToFeedWebsocket) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(genesis_hash) {
// Get all feeds for the chain, but only broadcast to those feeds that
// are also subscribed to receive finality updates.
for &feed_id in feeds.union(&self.feed_conn_id_finality) {
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
let _ = chan.send(message.clone()).await;
}
}
}
}
}
+252 -9
View File
@@ -1,19 +1,20 @@
use std::sync::Arc;
use std::collections::{ HashSet, HashMap };
use common::types::{ BlockHash };
use common::types::{Block, NodeDetails, NodeLocation, Timestamp};
use std::collections::{ HashSet };
use common::types::{ BlockHash, BlockNumber };
use common::types::{Block, Timestamp};
use common::util::{now, DenseMap, NumStats};
use common::most_seen::{ MostSeen, self };
use common::most_seen::MostSeen;
use common::node::Payload;
use std::iter::IntoIterator;
use once_cell::sync::Lazy;
use crate::feed_message;
use super::node::Node;
use super::NodeId;
pub type ChainId = usize;
pub type Label = Box<str>;
const STALE_TIMEOUT: u64 = 2 * 60 * 1000; // 2 minutes
pub struct Chain {
/// Labels that nodes use for this chain. We keep track of
/// the most commonly used label as nodes are added/removed.
@@ -45,6 +46,54 @@ pub struct RemoveNodeResult {
pub chain_renamed: bool
}
/// When we update a node, we subscribe and receive various messages
/// about the update that take this form. The expectation is that we'll
/// push and broadcast these messages to feeds. This allows the caller to
/// retain control over exactly how/when that will happen.
pub enum OnUpdateNode<'a> {
StaleNode(feed_message::StaleNode),
BestBlock(feed_message::BestBlock),
BestFinalized(feed_message::BestFinalized),
ImportedBlock(feed_message::ImportedBlock<'a>),
Hardware(feed_message::Hardware<'a>),
NodeStatsUpdate(feed_message::NodeStatsUpdate<'a>),
NodeIOUpdate(feed_message::NodeIOUpdate<'a>),
AfgFinalized(feed_message::AfgFinalized),
AfgReceivedPrecommit(feed_message::AfgReceivedPrecommit),
AfgReceivedPrevote(feed_message::AfgReceivedPrevote),
FinalizedBlock(feed_message::FinalizedBlock)
}
macro_rules! into_on_update {
($name:ident) => {
impl <'a> From<feed_message::$name> for OnUpdateNode<'a> {
fn from(val: feed_message::$name) -> Self {
OnUpdateNode::$name(val)
}
}
}
}
macro_rules! into_on_update_lt {
($name:ident) => {
impl <'a> From<feed_message::$name<'a>> for OnUpdateNode<'a> {
fn from(val: feed_message::$name<'a>) -> Self {
OnUpdateNode::$name(val)
}
}
}
}
into_on_update!(StaleNode);
into_on_update!(BestBlock);
into_on_update!(BestFinalized);
into_on_update_lt!(ImportedBlock);
into_on_update_lt!(Hardware);
into_on_update_lt!(NodeStatsUpdate);
into_on_update_lt!(NodeIOUpdate);
into_on_update!(AfgFinalized);
into_on_update!(AfgReceivedPrecommit);
into_on_update!(AfgReceivedPrevote);
into_on_update!(FinalizedBlock);
/// Labels of chains we consider "first party". These chains allow any
/// number of nodes to connect.
static FIRST_PARTY_NETWORKS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
@@ -107,6 +156,200 @@ impl Chain {
}
}
/// Attempt to update the best block seen in this chain.
pub fn update_node<OnUpdate>(&mut self, all_nodes: &mut DenseMap<Node>, nid: NodeId, payload: Payload, mut on_update: OnUpdate)
where OnUpdate: FnMut(OnUpdateNode)
{
if let Some(block) = payload.best_block() {
self.handle_block(all_nodes, block, nid, &mut on_update);
}
if let Some(node) = all_nodes.get_mut(nid) {
match payload {
Payload::SystemInterval(ref interval) => {
if node.update_hardware(interval) {
on_update(feed_message::Hardware(nid, node.hardware()).into());
}
if let Some(stats) = node.update_stats(interval) {
on_update(feed_message::NodeStatsUpdate(nid, stats).into());
}
if let Some(io) = node.update_io(interval) {
on_update(feed_message::NodeIOUpdate(nid, io).into());
}
}
Payload::AfgAuthoritySet(authority) => {
node.set_validator_address(authority.authority_id.clone());
return;
}
Payload::AfgFinalized(finalized) => {
if let Ok(finalized_number) = finalized.finalized_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() {
on_update(feed_message::AfgFinalized(
addr,
finalized_number,
finalized.finalized_hash,
).into());
}
}
return;
}
Payload::AfgReceivedPrecommit(precommit) => {
if let Ok(finalized_number) =
precommit.target_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() {
let voter = precommit.voter.clone();
on_update(feed_message::AfgReceivedPrecommit(
addr,
finalized_number,
precommit.target_hash,
voter,
).into());
}
}
return;
}
Payload::AfgReceivedPrevote(prevote) => {
if let Ok(finalized_number) =
prevote.target_number.parse::<BlockNumber>()
{
if let Some(addr) = node.details().validator.clone() {
let voter = prevote.voter.clone();
on_update(feed_message::AfgReceivedPrevote(
addr,
finalized_number,
prevote.target_hash,
voter,
).into());
}
}
return;
}
Payload::AfgReceivedCommit(_) => {}
_ => (),
}
if let Some(block) = payload.finalized_block() {
if let Some(finalized) = node.update_finalized(block) {
on_update(feed_message::FinalizedBlock(
nid,
finalized.height,
finalized.hash,
).into());
if finalized.height > self.finalized.height {
self.finalized = *finalized;
on_update(feed_message::BestFinalized(finalized.height, finalized.hash).into());
}
}
}
}
}
fn handle_block<OnUpdate>(&mut self, all_nodes: &mut DenseMap<Node>, block: &Block, nid: NodeId, mut on_update: OnUpdate)
where OnUpdate: FnMut(OnUpdateNode)
{
let mut propagation_time = None;
let now = now();
let nodes_len = self.node_ids.len();
self.update_stale_nodes(all_nodes, now, &mut on_update);
let node = match all_nodes.get_mut(nid) {
Some(node) => node,
None => return,
};
if node.update_block(*block) {
if block.height > self.best.height {
self.best = *block;
log::debug!(
"[{}] [nodes={}] new best block={}/{:?}",
self.labels.best(),
nodes_len,
self.best.height,
self.best.hash,
);
if let Some(timestamp) = self.timestamp {
self.block_times.push(now - timestamp);
self.average_block_time = Some(self.block_times.average());
}
self.timestamp = Some(now);
on_update(feed_message::BestBlock(
self.best.height,
now,
self.average_block_time,
).into());
propagation_time = Some(0);
} else if block.height == self.best.height {
if let Some(timestamp) = self.timestamp {
propagation_time = Some(now - timestamp);
}
}
if let Some(details) = node.update_details(now, propagation_time) {
on_update(feed_message::ImportedBlock(nid, details).into());
}
}
}
/// Check if the chain is stale (has not received a new best block in a while).
/// If so, find a new best block, ignoring any stale nodes and marking them as such.
fn update_stale_nodes<OnUpdate>(&mut self, all_nodes: &mut DenseMap<Node>, now: u64, mut on_update: OnUpdate)
where OnUpdate: FnMut(OnUpdateNode)
{
let threshold = now - STALE_TIMEOUT;
let timestamp = match self.timestamp {
Some(ts) => ts,
None => return,
};
if timestamp > threshold {
// Timestamp is in range, nothing to do
return;
}
let mut best = Block::zero();
let mut finalized = Block::zero();
let mut timestamp = None;
for &nid in self.node_ids.iter() {
let node = match all_nodes.get_mut(nid) {
Some(node) => node,
None => continue
};
if !node.update_stale(threshold) {
if node.best().height > best.height {
best = *node.best();
timestamp = Some(node.best_timestamp());
}
if node.finalized().height > finalized.height {
finalized = *node.finalized();
}
} else {
on_update(feed_message::StaleNode(nid).into());
}
}
if self.best.height != 0 || self.finalized.height != 0 {
self.best = best;
self.finalized = finalized;
self.block_times.reset();
self.timestamp = timestamp;
on_update(feed_message::BestBlock(
self.best.height,
timestamp.unwrap_or(now),
None,
).into());
on_update(feed_message::BestFinalized(finalized.height, finalized.hash).into());
}
}
pub fn label(&self) -> &str {
&self.labels.best()
}
@@ -119,8 +362,8 @@ impl Chain {
pub fn best_block(&self) -> &Block {
&self.best
}
pub fn timestamp(&self) -> Timestamp {
self.timestamp.unwrap_or(0)
pub fn timestamp(&self) -> Option<Timestamp> {
self.timestamp
}
pub fn average_block_time(&self) -> Option<u64> {
self.average_block_time
+1 -3
View File
@@ -1,7 +1,5 @@
use std::sync::Arc;
use common::types::{
Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeId, NodeLocation, NodeStats,
Block, BlockDetails, NodeDetails, NodeHardware, NodeIO, NodeLocation, NodeStats,
Timestamp,
};
use common::util::now;
+36 -23
View File
@@ -1,9 +1,8 @@
use std::sync::Arc;
use std::collections::{ HashSet, HashMap };
use common::types::{ BlockHash };
use super::node::Node;
use common::types::{Block, NodeDetails, NodeLocation, Timestamp};
use common::util::{now, DenseMap, NumStats};
use common::types::{Block, NodeDetails, Timestamp};
use common::util::{DenseMap};
use common::node::Payload;
use std::iter::IntoIterator;
use crate::find_location;
@@ -53,7 +52,10 @@ pub struct NodeAddedToChain<'a> {
pub has_chain_label_changed: bool
}
// if removing a node is successful, we get this information back.
/// During a node update, we get given various messages about the update
pub type OnUpdateNode<'a> = chain::OnUpdateNode<'a>;
/// if removing a node is successful, we get this information back.
pub struct RemovedNode {
/// How many nodes remain on the chain (0 if the chain was removed)
pub chain_node_count: usize,
@@ -94,6 +96,13 @@ impl State {
.map(move |(_,chain)| StateChain { state: self, chain })
}
pub fn get_chain_by_node_id(&self, node_id: NodeId) -> Option<StateChain<'_>> {
self.chains_by_node
.get(&node_id)
.and_then(|&chain_id| self.chains.get(chain_id))
.map(|chain| StateChain { state: self, chain })
}
pub fn get_chain_by_genesis_hash(&self, genesis_hash: &BlockHash) -> Option<StateChain<'_>> {
self.chains_by_genesis_hash
.get(genesis_hash)
@@ -207,28 +216,32 @@ impl State {
})
}
/// Attempt to update the best block seen, given a node and block.
pub fn update_node<OnUpdate>(&mut self, node_id: NodeId, payload: Payload, on_update: OnUpdate)
where OnUpdate: FnMut(OnUpdateNode)
{
let chain_id = match self.chains_by_node.get(&node_id) {
Some(chain_id) => *chain_id,
None => { log::error!("Cannot find chain_id for node with ID {}", node_id); return }
};
let chain = match self.chains.get_mut(chain_id) {
Some(chain) => chain,
None => { log::error!("Cannot find chain for node with ID {}", node_id); return }
};
chain.update_node(&mut self.nodes, node_id, payload, on_update);
}
/// Update the location for a node. Return `false` if the node was not found.
pub fn update_node_location(&mut self, node_id: NodeId, location: find_location::Location) -> bool {
if let Some(node) = self.get_node_mut(node_id) {
if let Some(node) = self.nodes.get_mut(node_id) {
node.update_location(location);
true
} else {
false
}
}
/// Get the chain that a node belongs to.
pub fn get_node_chain(&self, node_id: NodeId) -> Option<StateChain<'_>> {
self.chains_by_node
.get(&node_id)
.and_then(|&chain_id| self.chains.get(chain_id))
.map(|chain| StateChain { state: self, chain })
}
/// Obtain mutable access to a node, if it's found.
fn get_node_mut(&mut self, node_id: NodeId) -> Option<&mut Node> {
self.nodes.get_mut(node_id)
}
}
@@ -255,7 +268,7 @@ impl <'a> StateChain<'a> {
self.chain.best_block()
}
pub fn timestamp(&self) -> Timestamp {
self.chain.timestamp()
self.chain.timestamp().unwrap_or(0)
}
pub fn average_block_time(&self) -> Option<u64> {
self.chain.average_block_time()
@@ -335,21 +348,21 @@ mod test {
let chain1_genesis = BlockHash::from_low_u64_be(1);
state.add_node(chain1_genesis, node("A", "Chain One")); // 0
assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One");
assert_eq!(state.get_chain_by_node_id(0).expect("Chain should exist").label(), "Chain One");
assert!(state.get_chain_by_label("Chain One").is_some());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
state.add_node(chain1_genesis, node("B", "Chain Two")); // 1
// Chain name hasn't changed yet; "Chain One" as common as "Chain Two"..
assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One");
assert_eq!(state.get_chain_by_node_id(0).expect("Chain should exist").label(), "Chain One");
assert!(state.get_chain_by_label("Chain One").is_some());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
state.add_node(chain1_genesis, node("B", "Chain Two")); // 2
// Chain name has changed; "Chain Two" the winner now..
assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain Two");
assert_eq!(state.get_chain_by_node_id(0).expect("Chain should exist").label(), "Chain Two");
assert!(state.get_chain_by_label("Chain One").is_none());
assert!(state.get_chain_by_label("Chain Two").is_some());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());
@@ -358,7 +371,7 @@ mod test {
state.remove_node(2).expect("Removal OK (id: 2");
// Removed both "Chain Two" nodes; dominant name now "Chain One" again..
assert_eq!(state.get_node_chain(0).expect("Chain should exist").label(), "Chain One");
assert_eq!(state.get_chain_by_node_id(0).expect("Chain should exist").label(), "Chain One");
assert!(state.get_chain_by_label("Chain One").is_some());
assert!(state.get_chain_by_label("Chain Two").is_none());
assert!(state.get_chain_by_genesis_hash(&chain1_genesis).is_some());