mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-05-30 01:51:08 +00:00
WIP filling in core aggregator match arms and various other tweaks
This commit is contained in:
Generated
+2
@@ -1193,10 +1193,12 @@ dependencies = [
|
|||||||
"hex",
|
"hex",
|
||||||
"http",
|
"http",
|
||||||
"log",
|
"log",
|
||||||
|
"once_cell",
|
||||||
"primitive-types",
|
"primitive-types",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"simple_logger",
|
"simple_logger",
|
||||||
|
"smallvec",
|
||||||
"soketto",
|
"soketto",
|
||||||
"structopt",
|
"structopt",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
|
|
||||||
use crate::node::Payload;
|
use crate::node::Payload;
|
||||||
use crate::types::{NodeDetails};
|
use crate::types::{NodeDetails, BlockHash};
|
||||||
use crate::assign_id::Id;
|
use crate::assign_id::Id;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
pub type LocalId = Id;
|
pub type LocalId = Id;
|
||||||
|
|
||||||
/// A global ID assigned to messages from each different pair of ConnId+LocalId.
|
/// A global ID assigned to messages from each different pair of ConnId+LocalId.
|
||||||
pub type GlobalId = Id;
|
pub type GlobalId = usize;
|
||||||
|
|
||||||
/// Message sent from the shard to the backend core
|
/// Message sent from the shard to the backend core
|
||||||
#[derive(Deserialize, Serialize, Debug, Clone)]
|
#[derive(Deserialize, Serialize, Debug, Clone)]
|
||||||
@@ -20,6 +20,7 @@ pub enum FromShardAggregator {
|
|||||||
ip: Option<IpAddr>,
|
ip: Option<IpAddr>,
|
||||||
node: NodeDetails,
|
node: NodeDetails,
|
||||||
local_id: LocalId,
|
local_id: LocalId,
|
||||||
|
genesis_hash: BlockHash
|
||||||
},
|
},
|
||||||
/// Send a message payload to update details for a node
|
/// Send a message payload to update details for a node
|
||||||
UpdateNode {
|
UpdateNode {
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use common::{internal_messages::{self, LocalId}, node, assign_id::AssignId};
|
use common::{internal_messages::{self, LocalId}, node, assign_id::AssignId, types::BlockHash};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::AtomicU64;
|
use std::sync::atomic::AtomicU64;
|
||||||
use futures::{channel::mpsc, future};
|
use futures::{channel::mpsc, future};
|
||||||
@@ -42,6 +42,7 @@ pub enum FromWebsocket {
|
|||||||
message_id: node::NodeMessageId,
|
message_id: node::NodeMessageId,
|
||||||
ip: Option<std::net::IpAddr>,
|
ip: Option<std::net::IpAddr>,
|
||||||
node: common::types::NodeDetails,
|
node: common::types::NodeDetails,
|
||||||
|
genesis_hash: BlockHash
|
||||||
},
|
},
|
||||||
/// Update/pass through details about a node.
|
/// Update/pass through details about a node.
|
||||||
Update {
|
Update {
|
||||||
@@ -150,7 +151,7 @@ impl Aggregator {
|
|||||||
// connections where we mute one set of messages it sends and not others.
|
// connections where we mute one set of messages it sends and not others.
|
||||||
close_connections.push(close_connection);
|
close_connections.push(close_connection);
|
||||||
},
|
},
|
||||||
ToAggregator::FromWebsocket(conn_id, FromWebsocket::Add { message_id, ip, node }) => {
|
ToAggregator::FromWebsocket(conn_id, FromWebsocket::Add { message_id, ip, node, genesis_hash }) => {
|
||||||
// Don't bother doing anything else if we're disconnected, since we'll force the
|
// Don't bother doing anything else if we're disconnected, since we'll force the
|
||||||
// ndoe to reconnect anyway when the backend does:
|
// ndoe to reconnect anyway when the backend does:
|
||||||
if !connected_to_telemetry_core { continue }
|
if !connected_to_telemetry_core { continue }
|
||||||
@@ -162,6 +163,7 @@ impl Aggregator {
|
|||||||
let _ = tx_to_telemetry_core.send(FromShardAggregator::AddNode {
|
let _ = tx_to_telemetry_core.send(FromShardAggregator::AddNode {
|
||||||
ip,
|
ip,
|
||||||
node,
|
node,
|
||||||
|
genesis_hash,
|
||||||
local_id
|
local_id
|
||||||
}).await;
|
}).await;
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -157,6 +157,7 @@ async fn handle_websocket_connection<S>(mut websocket: ws::WebSocket, mut tx_to_
|
|||||||
message_id,
|
message_id,
|
||||||
ip: addr.map(|a| a.ip()),
|
ip: addr.map(|a| a.ip()),
|
||||||
node: info.node,
|
node: info.node,
|
||||||
|
genesis_hash: info.genesis_hash,
|
||||||
}).await;
|
}).await;
|
||||||
}
|
}
|
||||||
// Anything that's not an "Add" is an Update. The aggregator will ignore
|
// Anything that's not an "Add" is an Update. The aggregator will ignore
|
||||||
|
|||||||
@@ -12,10 +12,12 @@ futures = "0.3.15"
|
|||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
http = "0.2.4"
|
http = "0.2.4"
|
||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
|
once_cell = "1.8.0"
|
||||||
primitive-types = { version = "0.9.0", features = ["serde"] }
|
primitive-types = { version = "0.9.0", features = ["serde"] }
|
||||||
serde = { version = "1.0.126", features = ["derive"] }
|
serde = { version = "1.0.126", features = ["derive"] }
|
||||||
serde_json = "1.0.64"
|
serde_json = "1.0.64"
|
||||||
simple_logger = "1.11.0"
|
simple_logger = "1.11.0"
|
||||||
|
smallvec = "1.6.1"
|
||||||
soketto = "0.6.0"
|
soketto = "0.6.0"
|
||||||
structopt = "0.3.21"
|
structopt = "0.3.21"
|
||||||
thiserror = "1.0.25"
|
thiserror = "1.0.25"
|
||||||
|
|||||||
@@ -1,4 +1,9 @@
|
|||||||
use common::{internal_messages::{GlobalId, LocalId}, node, assign_id::AssignId};
|
use common::{
|
||||||
|
internal_messages::{GlobalId, LocalId},
|
||||||
|
node,
|
||||||
|
assign_id::AssignId,
|
||||||
|
util::now
|
||||||
|
};
|
||||||
use std::{str::FromStr, sync::Arc};
|
use std::{str::FromStr, sync::Arc};
|
||||||
use std::sync::atomic::AtomicU64;
|
use std::sync::atomic::AtomicU64;
|
||||||
use futures::channel::{ mpsc, oneshot };
|
use futures::channel::{ mpsc, oneshot };
|
||||||
@@ -7,6 +12,7 @@ use tokio::net::TcpStream;
|
|||||||
use tokio_util::compat::{ TokioAsyncReadCompatExt };
|
use tokio_util::compat::{ TokioAsyncReadCompatExt };
|
||||||
use std::collections::{ HashMap, HashSet };
|
use std::collections::{ HashMap, HashSet };
|
||||||
use crate::state::State;
|
use crate::state::State;
|
||||||
|
use crate::feed_message::{ self, FeedMessageSerializer };
|
||||||
|
|
||||||
/// A unique Id is assigned per websocket connection (or more accurately,
|
/// A unique Id is assigned per websocket connection (or more accurately,
|
||||||
/// per feed socket and per shard socket). This can be combined with the
|
/// per feed socket and per shard socket). This can be combined with the
|
||||||
@@ -33,6 +39,7 @@ pub enum FromShardWebsocket {
|
|||||||
local_id: LocalId,
|
local_id: LocalId,
|
||||||
ip: Option<std::net::IpAddr>,
|
ip: Option<std::net::IpAddr>,
|
||||||
node: common::types::NodeDetails,
|
node: common::types::NodeDetails,
|
||||||
|
genesis_hash: common::types::BlockHash
|
||||||
},
|
},
|
||||||
/// Update/pass through details about a node.
|
/// Update/pass through details about a node.
|
||||||
Update {
|
Update {
|
||||||
@@ -68,13 +75,9 @@ pub enum FromFeedWebsocket {
|
|||||||
chain: Box<str>
|
chain: Box<str>
|
||||||
},
|
},
|
||||||
/// The feed wants finality info for the chain, too.
|
/// The feed wants finality info for the chain, too.
|
||||||
SendFinality {
|
SendFinality,
|
||||||
chain: Box<str>
|
|
||||||
},
|
|
||||||
/// The feed doesn't want any more finality info for the chain.
|
/// The feed doesn't want any more finality info for the chain.
|
||||||
NoMoreFinality {
|
NoMoreFinality,
|
||||||
chain: Box<str>
|
|
||||||
},
|
|
||||||
/// An explicit ping message.
|
/// An explicit ping message.
|
||||||
Ping {
|
Ping {
|
||||||
chain: Box<str>
|
chain: Box<str>
|
||||||
@@ -92,8 +95,8 @@ impl FromStr for FromFeedWebsocket {
|
|||||||
match cmd {
|
match cmd {
|
||||||
"ping" => Ok(FromFeedWebsocket::Ping { chain }),
|
"ping" => Ok(FromFeedWebsocket::Ping { chain }),
|
||||||
"subscribe" => Ok(FromFeedWebsocket::Subscribe { chain }),
|
"subscribe" => Ok(FromFeedWebsocket::Subscribe { chain }),
|
||||||
"send-finality" => Ok(FromFeedWebsocket::SendFinality { chain }),
|
"send-finality" => Ok(FromFeedWebsocket::SendFinality),
|
||||||
"no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality { chain }),
|
"no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality),
|
||||||
_ => return Err(anyhow::anyhow!("Command {} not recognised", cmd))
|
_ => return Err(anyhow::anyhow!("Command {} not recognised", cmd))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -102,7 +105,7 @@ impl FromStr for FromFeedWebsocket {
|
|||||||
/// The aggregator can these messages back to a feed connection.
|
/// The aggregator can these messages back to a feed connection.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum ToFeedWebsocket {
|
pub enum ToFeedWebsocket {
|
||||||
|
Bytes(Vec<u8>)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -142,7 +145,7 @@ impl Aggregator {
|
|||||||
// any more, this task will gracefully end.
|
// any more, this task will gracefully end.
|
||||||
async fn handle_messages(mut rx_from_external: mpsc::Receiver<ToAggregator>, denylist: Vec<String>) {
|
async fn handle_messages(mut rx_from_external: mpsc::Receiver<ToAggregator>, denylist: Vec<String>) {
|
||||||
|
|
||||||
let mut node_state = State::new();
|
let mut node_state = State::new(denylist);
|
||||||
|
|
||||||
// Maintain mappings from the shard connection ID and local ID of messages to a global ID
|
// Maintain mappings from the shard connection ID and local ID of messages to a global ID
|
||||||
// that uniquely identifies nodes in our node state.
|
// that uniquely identifies nodes in our node state.
|
||||||
@@ -152,46 +155,124 @@ impl Aggregator {
|
|||||||
let mut feed_channels = HashMap::new();
|
let mut feed_channels = HashMap::new();
|
||||||
let mut shard_channels = HashMap::new();
|
let mut shard_channels = HashMap::new();
|
||||||
|
|
||||||
// What chains have aour feeds subscribed to (one at a time at the mo):
|
// What chains have our feeds subscribed to (one at a time at the mo)?
|
||||||
|
// Both of these need to be kept in sync (should move to own struct eventually).
|
||||||
let mut feed_conn_id_to_chain: HashMap<ConnId, Box<str>> = HashMap::new();
|
let mut feed_conn_id_to_chain: HashMap<ConnId, Box<str>> = HashMap::new();
|
||||||
let mut chain_to_feed_conn_ids: HashMap<Box<str>, HashSet<ConnId>> = HashMap::new();
|
let mut chain_to_feed_conn_ids: HashMap<Box<str>, HashSet<ConnId>> = HashMap::new();
|
||||||
|
|
||||||
|
// Which feeds want finality info too?
|
||||||
let mut feed_conn_id_finality: HashSet<ConnId> = HashSet::new();
|
let mut feed_conn_id_finality: HashSet<ConnId> = HashSet::new();
|
||||||
|
|
||||||
// Now, loop and receive messages to handle.
|
// Now, loop and receive messages to handle.
|
||||||
while let Some(msg) = rx_from_external.next().await {
|
while let Some(msg) = rx_from_external.next().await {
|
||||||
match msg {
|
match msg {
|
||||||
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Initialize { channel }) => {
|
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Initialize { mut channel }) => {
|
||||||
feed_channels.insert(feed_conn_id, channel);
|
feed_channels.insert(feed_conn_id, channel.clone());
|
||||||
|
|
||||||
// TODO: `feed::AddedChain` message to tell feed about current chains.
|
// Tell the new feed subscription some basic things to get it going:
|
||||||
},
|
let mut feed_serializer = FeedMessageSerializer::new();
|
||||||
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Ping { chain }) => {
|
feed_serializer.push(feed_message::Version(31));
|
||||||
// TODO: Return with feed::Pong(chain) feed message.
|
for chain in node_state.iter_chains() {
|
||||||
},
|
feed_serializer.push(feed_message::AddedChain(
|
||||||
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Subscribe { chain }) => {
|
chain.label(),
|
||||||
// Unsubscribe from previous chain if subscribed to one:
|
chain.node_count()
|
||||||
if let Some(feed_ids) = chain_to_feed_conn_ids.get_mut(&chain) {
|
));
|
||||||
feed_ids.remove(&feed_conn_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe to the new chain:
|
// Send this to the channel that subscribed:
|
||||||
feed_conn_id_to_chain.insert(feed_conn_id, chain.clone());
|
if let Some(bytes) = feed_serializer.into_finalized() {
|
||||||
chain_to_feed_conn_ids.entry(chain).or_default().insert(feed_conn_id);
|
let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Ping { chain }) => {
|
||||||
|
let feed_channel = match feed_channels.get_mut(&feed_conn_id) {
|
||||||
|
Some(chan) => chan,
|
||||||
|
None => continue
|
||||||
|
};
|
||||||
|
|
||||||
|
// Pong!
|
||||||
|
let mut feed_serializer = FeedMessageSerializer::new();
|
||||||
|
feed_serializer.push(feed_message::Pong(&chain));
|
||||||
|
if let Some(bytes) = feed_serializer.into_finalized() {
|
||||||
|
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::SendFinality { chain: _ }) => {
|
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Subscribe { chain }) => {
|
||||||
feed_conn_id_finality.insert(feed_conn_id);
|
let feed_channel = match feed_channels.get_mut(&feed_conn_id) {
|
||||||
// TODO: Do we care about the chain here?
|
Some(chan) => chan,
|
||||||
},
|
None => continue
|
||||||
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality { chain: _ }) => {
|
};
|
||||||
|
|
||||||
|
// Unsubscribe from previous chain if subscribed to one:
|
||||||
|
let old_chain_label = feed_conn_id_to_chain.remove(&feed_conn_id);
|
||||||
|
if let Some(old_chain_label) = &old_chain_label {
|
||||||
|
if let Some(map) = chain_to_feed_conn_ids.get_mut(old_chain_label) {
|
||||||
|
map.remove(&feed_conn_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Untoggle request for finality feeds:
|
||||||
|
feed_conn_id_finality.remove(&feed_conn_id);
|
||||||
|
|
||||||
|
// Get the chain we're subscribing to, ignoring the rest if it doesn't exist.
|
||||||
|
let chain = match node_state.get_chain_by_label(&chain) {
|
||||||
|
Some(chain) => chain,
|
||||||
|
None => continue
|
||||||
|
};
|
||||||
|
|
||||||
|
// Send messages to the feed about the new chain:
|
||||||
|
let mut feed_serializer = FeedMessageSerializer::new();
|
||||||
|
if let Some(old_chain_label) = old_chain_label {
|
||||||
|
feed_serializer.push(feed_message::UnsubscribedFrom(&old_chain_label));
|
||||||
|
}
|
||||||
|
feed_serializer.push(feed_message::SubscribedTo(chain.label()));
|
||||||
|
feed_serializer.push(feed_message::TimeSync(now()));
|
||||||
|
feed_serializer.push(feed_message::BestBlock (
|
||||||
|
chain.best_block().height,
|
||||||
|
chain.timestamp(),
|
||||||
|
chain.average_block_time()
|
||||||
|
));
|
||||||
|
feed_serializer.push(feed_message::BestFinalized (
|
||||||
|
chain.finalized_block().height,
|
||||||
|
chain.finalized_block().hash
|
||||||
|
));
|
||||||
|
for (idx, (gid, node)) in node_state.get_nodes_in_chain(chain).enumerate() {
|
||||||
|
// Send subscription confirmation and chain head before doing all the nodes,
|
||||||
|
// and continue sending batches of 32 nodes a time over the wire subsequently
|
||||||
|
if idx % 32 == 0 {
|
||||||
|
if let Some(bytes) = feed_serializer.finalize() {
|
||||||
|
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
feed_serializer.push(feed_message::AddedNode(gid, node));
|
||||||
|
feed_serializer.push(feed_message::FinalizedBlock(
|
||||||
|
gid,
|
||||||
|
node.finalized().height,
|
||||||
|
node.finalized().hash,
|
||||||
|
));
|
||||||
|
if node.stale() {
|
||||||
|
feed_serializer.push(feed_message::StaleNode(gid));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(bytes) = feed_serializer.into_finalized() {
|
||||||
|
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Actually make a note of the new chain subsciption:
|
||||||
|
feed_conn_id_to_chain.insert(feed_conn_id, chain.label().into());
|
||||||
|
chain_to_feed_conn_ids.entry(chain.label().into()).or_default().insert(feed_conn_id);
|
||||||
|
},
|
||||||
|
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::SendFinality) => {
|
||||||
|
feed_conn_id_finality.insert(feed_conn_id);
|
||||||
|
},
|
||||||
|
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality) => {
|
||||||
feed_conn_id_finality.remove(&feed_conn_id);
|
feed_conn_id_finality.remove(&feed_conn_id);
|
||||||
// TODO: Do we care about the chain here?
|
|
||||||
},
|
},
|
||||||
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Initialize { channel }) => {
|
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Initialize { channel }) => {
|
||||||
shard_channels.insert(shard_conn_id, channel);
|
shard_channels.insert(shard_conn_id, channel);
|
||||||
},
|
},
|
||||||
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Add { local_id, ip, node }) => {
|
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Add { local_id, ip, node, genesis_hash }) => {
|
||||||
let global_node_id = to_global_node_id.assign_id((shard_conn_id, local_id));
|
// Get globalId from add_node and store that against shard/local_id.
|
||||||
|
|
||||||
// TODO: node_state.add_node. Every feed should know about node count changes.
|
// TODO: node_state.add_node. Every feed should know about node count changes.
|
||||||
},
|
},
|
||||||
@@ -201,11 +282,47 @@ impl Aggregator {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Update { local_id, payload }) => {
|
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Update { local_id, payload }) => {
|
||||||
|
// TODO: Fill this all in...
|
||||||
let global_node_id = match to_global_node_id.get_id(&(shard_conn_id, local_id)) {
|
let global_node_id = match to_global_node_id.get_id(&(shard_conn_id, local_id)) {
|
||||||
Some(id) => id,
|
Some(id) => id,
|
||||||
None => continue
|
None => continue
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(block) = payload.best_block() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
match payload {
|
||||||
|
node::Payload::SystemInterval(system_interval) => {
|
||||||
|
|
||||||
|
},
|
||||||
|
node::Payload::AfgAuthoritySet(_) => {
|
||||||
|
|
||||||
|
},
|
||||||
|
node::Payload::AfgFinalized(_) => {
|
||||||
|
|
||||||
|
},
|
||||||
|
node::Payload::AfgReceivedPrecommit(_) => {
|
||||||
|
|
||||||
|
},
|
||||||
|
node::Payload::AfgReceivedPrevote(_) => {
|
||||||
|
|
||||||
|
},
|
||||||
|
// This message should have been handled before the payload made it this far:
|
||||||
|
node::Payload::SystemConnected(_) => {
|
||||||
|
unreachable!("SystemConnected message seen in Telemetry Core, but should have been handled in shard");
|
||||||
|
},
|
||||||
|
// The following messages aren't handled at the moment. List them explicitly so
|
||||||
|
// that we have to make an explicit choice for any new messages:
|
||||||
|
node::Payload::BlockImport(_) |
|
||||||
|
node::Payload::NotifyFinalized(_) |
|
||||||
|
node::Payload::AfgReceivedCommit(_) |
|
||||||
|
node::Payload::TxPoolImport |
|
||||||
|
node::Payload::AfgFinalizedBlocksUpTo |
|
||||||
|
node::Payload::AuraPreSealedBlock |
|
||||||
|
node::Payload::PreparedBlockForProposing => {},
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: node_state.update_node, then handle returned diffs
|
// TODO: node_state.update_node, then handle returned diffs
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
+16
-5
@@ -1,15 +1,14 @@
|
|||||||
//! This module provides the messages that will be
|
//! This module provides a way of encoding the various messages that we'll
|
||||||
//! sent to subscribing feeds.
|
//! send to subscribed feeds (browsers).
|
||||||
|
|
||||||
use serde::ser::{SerializeTuple, Serializer};
|
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
|
|
||||||
use super::node::Node;
|
use crate::state::Node;
|
||||||
use serde_json::to_writer;
|
use serde_json::to_writer;
|
||||||
use common::types::{
|
use common::types::{
|
||||||
Address, BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeId, NodeStats,
|
Address, BlockDetails, BlockHash, BlockNumber, NodeHardware, NodeIO, NodeId, NodeStats,
|
||||||
Timestamp, NodeDetails,
|
Timestamp
|
||||||
};
|
};
|
||||||
|
|
||||||
pub trait FeedMessage {
|
pub trait FeedMessage {
|
||||||
@@ -65,6 +64,8 @@ impl FeedMessageSerializer {
|
|||||||
let _ = to_writer(&mut self.buffer, value);
|
let _ = to_writer(&mut self.buffer, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the bytes we've serialized so far and prepare a new buffer. If you're
|
||||||
|
/// finished serializing data, prefer [`FeedMessageSerializer::into_finalized`]
|
||||||
pub fn finalize(&mut self) -> Option<Vec<u8>> {
|
pub fn finalize(&mut self) -> Option<Vec<u8>> {
|
||||||
if self.buffer.is_empty() {
|
if self.buffer.is_empty() {
|
||||||
return None;
|
return None;
|
||||||
@@ -76,6 +77,16 @@ impl FeedMessageSerializer {
|
|||||||
|
|
||||||
Some(bytes)
|
Some(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the bytes that we've serialized so far, consuming the serializer.
|
||||||
|
pub fn into_finalized(mut self) -> Option<Vec<u8>> {
|
||||||
|
if self.buffer.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.buffer.push(b']');
|
||||||
|
Some(self.buffer)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! actions {
|
macro_rules! actions {
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
mod aggregator;
|
mod aggregator;
|
||||||
mod state;
|
mod state;
|
||||||
|
mod feed_message;
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
@@ -193,8 +194,8 @@ async fn handle_shard_websocket_connection<S>(mut websocket: ws::WebSocket, mut
|
|||||||
|
|
||||||
// Convert and send to the aggregator:
|
// Convert and send to the aggregator:
|
||||||
let aggregator_msg = match msg {
|
let aggregator_msg = match msg {
|
||||||
internal_messages::FromShardAggregator::AddNode { ip, node, local_id } => {
|
internal_messages::FromShardAggregator::AddNode { ip, node, local_id, genesis_hash } => {
|
||||||
FromShardWebsocket::Add { ip, node, local_id }
|
FromShardWebsocket::Add { ip, node, genesis_hash, local_id }
|
||||||
},
|
},
|
||||||
internal_messages::FromShardAggregator::UpdateNode { payload, local_id } => {
|
internal_messages::FromShardAggregator::UpdateNode { payload, local_id } => {
|
||||||
FromShardWebsocket::Update { local_id, payload }
|
FromShardWebsocket::Update { local_id, payload }
|
||||||
@@ -241,7 +242,13 @@ async fn handle_feed_websocket_connection<S>(mut websocket: ws::WebSocket, mut t
|
|||||||
None => break
|
None => break
|
||||||
};
|
};
|
||||||
|
|
||||||
println!("TODO: encode message and send down feed websocket: {:?}", msg);
|
// Send messages to the client (currently the only message is
|
||||||
|
// pre-serialized bytes that we send as binary):
|
||||||
|
match msg {
|
||||||
|
ToFeedWebsocket::Bytes(bytes) => {
|
||||||
|
let _ = websocket.send(ws::Message::binary(bytes)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// FRONTEND -> AGGREGATOR
|
// FRONTEND -> AGGREGATOR
|
||||||
msg = websocket.next() => {
|
msg = websocket.next() => {
|
||||||
|
|||||||
@@ -1,3 +1,59 @@
|
|||||||
pub struct Chain {
|
use std::sync::Arc;
|
||||||
|
use std::collections::{ HashSet, HashMap };
|
||||||
|
use common::types::{ BlockHash };
|
||||||
|
use common::internal_messages::{ GlobalId };
|
||||||
|
use super::node::Node;
|
||||||
|
use common::types::{Block, NodeDetails, NodeId, NodeLocation, Timestamp};
|
||||||
|
use common::util::{now, DenseMap, NumStats};
|
||||||
|
use common::node::Payload;
|
||||||
|
use std::iter::IntoIterator;
|
||||||
|
|
||||||
|
pub type ChainId = usize;
|
||||||
|
pub type Label = Arc<str>;
|
||||||
|
|
||||||
|
pub struct Chain {
|
||||||
|
/// Label of this chain, along with count of nodes that use this label
|
||||||
|
label: (Label, usize),
|
||||||
|
/// Chain genesis hash
|
||||||
|
genesis_hash: BlockHash,
|
||||||
|
/// Set of nodes that are in this chain
|
||||||
|
nodes: HashSet<GlobalId>,
|
||||||
|
/// Best block
|
||||||
|
best: Block,
|
||||||
|
/// Finalized block
|
||||||
|
finalized: Block,
|
||||||
|
/// Block times history, stored so we can calculate averages
|
||||||
|
block_times: NumStats<u64>,
|
||||||
|
/// Calculated average block time
|
||||||
|
average_block_time: Option<u64>,
|
||||||
|
/// When the best block first arrived
|
||||||
|
timestamp: Option<Timestamp>,
|
||||||
|
/// Some nodes might manifest a different label, note them here
|
||||||
|
labels: HashMap<Label, usize>,
|
||||||
|
/// How many nodes are allowed in this chain
|
||||||
|
max_nodes: usize
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Chain {
|
||||||
|
pub fn label(&self) -> &str {
|
||||||
|
&self.label.0
|
||||||
|
}
|
||||||
|
pub fn node_count(&self) -> usize {
|
||||||
|
self.nodes.len()
|
||||||
|
}
|
||||||
|
pub fn node_ids(&self) -> impl Iterator<Item=GlobalId> + '_ {
|
||||||
|
self.nodes.iter().copied()
|
||||||
|
}
|
||||||
|
pub fn best_block(&self) -> &Block {
|
||||||
|
&self.best
|
||||||
|
}
|
||||||
|
pub fn timestamp(&self) -> Timestamp {
|
||||||
|
self.timestamp.unwrap_or(0)
|
||||||
|
}
|
||||||
|
pub fn average_block_time(&self) -> Option<u64> {
|
||||||
|
self.average_block_time
|
||||||
|
}
|
||||||
|
pub fn finalized_block(&self) -> &Block {
|
||||||
|
&self.finalized
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -1,7 +1,9 @@
|
|||||||
mod node;
|
mod node;
|
||||||
mod chain;
|
mod chain;
|
||||||
mod feed_message;
|
// mod feed_message;
|
||||||
|
// mod diff;
|
||||||
|
|
||||||
mod state;
|
mod state;
|
||||||
|
|
||||||
pub use state::State;
|
pub use state::State;
|
||||||
|
pub use node::Node;
|
||||||
@@ -1,14 +1,129 @@
|
|||||||
use super::chain::Chain;
|
use std::sync::Arc;
|
||||||
use std::collections::HashMap;
|
use std::collections::{ HashSet, HashMap };
|
||||||
|
use common::types::{ BlockHash };
|
||||||
|
use common::internal_messages::{ GlobalId };
|
||||||
|
use super::node::Node;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use common::types::{Block, NodeDetails, NodeId, NodeLocation, Timestamp};
|
||||||
|
use common::util::{now, DenseMap, NumStats};
|
||||||
|
use common::node::Payload;
|
||||||
|
use std::iter::IntoIterator;
|
||||||
|
|
||||||
|
use super::chain::Chain;
|
||||||
|
|
||||||
|
pub type ChainId = usize;
|
||||||
|
pub type Label = Arc<str>;
|
||||||
|
|
||||||
|
/// Our state constains node and chain information
|
||||||
pub struct State {
|
pub struct State {
|
||||||
chains: HashMap<Box<str>, Chain>
|
chains: DenseMap<Chain>,
|
||||||
|
nodes: HashMap<GlobalId, Node>,
|
||||||
|
chains_by_genesis_hash: HashMap<BlockHash, ChainId>,
|
||||||
|
chains_by_label: HashMap<Label, ChainId>,
|
||||||
|
/// Denylist for networks we do not want to allow connecting.
|
||||||
|
denylist: HashSet<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Labels of chains we consider "first party". These chains allow any
|
||||||
|
/// number of nodes to connect.
|
||||||
|
static FIRST_PARTY_NETWORKS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
|
||||||
|
let mut set = HashSet::new();
|
||||||
|
set.insert("Polkadot");
|
||||||
|
set.insert("Kusama");
|
||||||
|
set.insert("Westend");
|
||||||
|
set.insert("Rococo");
|
||||||
|
set
|
||||||
|
});
|
||||||
|
|
||||||
|
/// Max number of nodes allowed to connect to the telemetry server.
|
||||||
|
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
|
||||||
|
|
||||||
|
/// Adding a node to a chain leads to this result:
|
||||||
|
pub enum AddNodeResult {
|
||||||
|
/// The chain is on the "deny list", so we can't add the node
|
||||||
|
ChainOnDenyList,
|
||||||
|
/// The chain is over quota (too many nodes connected), so can't add the node
|
||||||
|
ChainOverQuota,
|
||||||
|
/// The node was added to the chain
|
||||||
|
NodeAddedToChain(NodeAddedToChain)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct NodeAddedToChain {
|
||||||
|
/// The label for the chain (which may have changed as a result of adding the node):
|
||||||
|
chain_label: Arc<str>,
|
||||||
|
/// Has the chain label been updated?
|
||||||
|
has_chain_label_changed: bool,
|
||||||
|
// How many nodes now exist in the chain?
|
||||||
|
chain_node_count: usize
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RemoveNodeResult {
|
||||||
|
/// How many nodes remain on the chain (0 if the chain was removed):
|
||||||
|
chain_node_count: usize
|
||||||
}
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
pub fn new() -> State {
|
pub fn new<T: IntoIterator<Item=String>>(denylist: T) -> State {
|
||||||
State {
|
State {
|
||||||
chains: HashMap::new()
|
chains: DenseMap::new(),
|
||||||
|
nodes: HashMap::new(),
|
||||||
|
chains_by_genesis_hash: HashMap::new(),
|
||||||
|
chains_by_label: HashMap::new(),
|
||||||
|
denylist: denylist.into_iter().collect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn iter_chains(&self) -> impl Iterator<Item=&Chain> {
|
||||||
|
self.chains
|
||||||
|
.iter()
|
||||||
|
.map(|(_,chain)| chain)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_chain_by_label(&self, label: &str) -> Option<&Chain> {
|
||||||
|
self.chains_by_label
|
||||||
|
.get(label)
|
||||||
|
.and_then(|chain_id| self.chains.get(*chain_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_nodes_in_chain<'s>(&'s self, chain: &'s Chain) -> impl Iterator<Item=(GlobalId,&Node)> {
|
||||||
|
chain.node_ids()
|
||||||
|
.filter_map(move |id| self.nodes.get(&id).map(|node| (id, node)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// /// Add a new node to our state.
|
||||||
|
// pub fn add_node(&mut self, id: GlobalId, genesis_hash: BlockHash, node: &NodeDetails) -> AddNodeResult {
|
||||||
|
// if self.denylist.contains(&*node.chain) {
|
||||||
|
// return AddNodeResult::ChainOnDenyList;
|
||||||
|
// }
|
||||||
|
// let chain_id = self.chains.get_or_create(genesis_hash, &node.chain);
|
||||||
|
|
||||||
|
// return Ok(())
|
||||||
|
// }
|
||||||
|
|
||||||
|
// /// Remove a node from our state.
|
||||||
|
// pub fn remove_node(&mut self, id: GlobalId) -> RemoveNodeResult {
|
||||||
|
|
||||||
|
// }
|
||||||
|
|
||||||
|
// /// Update a node with new data. This needs breaking down into parts so
|
||||||
|
// /// that we can emit a useful result in each case to inform the aggregator
|
||||||
|
// /// what messages it needs to emit.
|
||||||
|
// pub fn update_node(&mut self, id: GlobalId, payload: Payload) {
|
||||||
|
|
||||||
|
// }
|
||||||
|
|
||||||
|
// fn get_or_create_chain(genesis_hash: BlockHash, chain: &str) -> ChainId {
|
||||||
|
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// First party networks (Polkadot, Kusama etc) are allowed any number of nodes.
|
||||||
|
/// Third party networks are allowed `THIRD_PARTY_NETWORKS_MAX_NODES` nodes and
|
||||||
|
/// no more.
|
||||||
|
fn max_nodes(label: &str) -> usize {
|
||||||
|
if FIRST_PARTY_NETWORKS.contains(label) {
|
||||||
|
usize::MAX
|
||||||
|
} else {
|
||||||
|
THIRD_PARTY_NETWORKS_MAX_NODES
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user