Lots more refactoring, finish add node (and almost the location updating)

This commit is contained in:
James Wilson
2021-06-23 18:03:11 +01:00
parent 2db2677217
commit 47c12ce210
16 changed files with 1297 additions and 459 deletions
+3
View File
@@ -14,7 +14,10 @@ hex = "0.4.3"
http = "0.2.4"
log = "0.4.14"
once_cell = "1.8.0"
parking_lot = "0.11.1"
primitive-types = { version = "0.9.0", features = ["serde"] }
reqwest = { version = "0.11.4", features = ["json"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.126", features = ["derive"] }
serde_json = "1.0.64"
simple_logger = "1.11.0"
-381
View File
@@ -1,381 +0,0 @@
use common::{
internal_messages::{GlobalId, LocalId},
node,
util::now
};
use bimap::BiMap;
use std::{str::FromStr, sync::Arc};
use std::sync::atomic::AtomicU64;
use futures::channel::{ mpsc, oneshot };
use futures::{ Sink, SinkExt, StreamExt };
use tokio::net::TcpStream;
use tokio_util::compat::{ TokioAsyncReadCompatExt };
use std::collections::{ HashMap, HashSet };
use crate::state::State;
use crate::feed_message::{ self, FeedMessageSerializer };
/// A unique Id is assigned per websocket connection (or more accurately,
/// per feed socket and per shard socket). This can be combined with the
/// [`LocalId`] of messages to give us a global ID.
type ConnId = u64;
/// Incoming messages come via subscriptions, and end up looking like this.
#[derive(Debug)]
enum ToAggregator {
FromShardWebsocket(ConnId, FromShardWebsocket),
FromFeedWebsocket(ConnId, FromFeedWebsocket),
}
/// An incoming shard connection can send these messages to the aggregator.
#[derive(Debug)]
pub enum FromShardWebsocket {
/// When the socket is opened, it'll send this first
/// so that we have a way to communicate back to it.
Initialize {
channel: mpsc::Sender<ToShardWebsocket>,
},
/// Tell the aggregator about a new node.
Add {
local_id: LocalId,
ip: Option<std::net::IpAddr>,
node: common::types::NodeDetails,
genesis_hash: common::types::BlockHash
},
/// Update/pass through details about a node.
Update {
local_id: LocalId,
payload: node::Payload
},
/// Tell the aggregator that a node has been removed when it disconnects.
Remove {
local_id: LocalId,
},
/// The shard is disconnected.
Disconnected
}
/// The aggregator can these messages back to a shard connection.
#[derive(Debug)]
pub enum ToShardWebsocket {
/// Mute messages to the core by passing the shard-local ID of them.
Mute {
local_id: LocalId
}
}
/// An incoming feed connection can send these messages to the aggregator.
#[derive(Debug)]
pub enum FromFeedWebsocket {
/// When the socket is opened, it'll send this first
/// so that we have a way to communicate back to it.
/// Unbounded so that slow feeds don't block aggregato
/// progress.
Initialize {
channel: mpsc::UnboundedSender<ToFeedWebsocket>,
},
/// The feed can subscribe to a chain to receive
/// messages relating to it.
Subscribe {
chain: Box<str>
},
/// The feed wants finality info for the chain, too.
SendFinality,
/// The feed doesn't want any more finality info for the chain.
NoMoreFinality,
/// An explicit ping message.
Ping {
chain: Box<str>
},
/// The feed is disconnected.
Disconnected
}
// The frontend sends text based commands; parse them into these messages:
impl FromStr for FromFeedWebsocket {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (cmd, chain) = match s.find(':') {
Some(idx) => (&s[..idx], s[idx+1..].into()),
None => return Err(anyhow::anyhow!("Expecting format `CMD:CHAIN_NAME`"))
};
match cmd {
"ping" => Ok(FromFeedWebsocket::Ping { chain }),
"subscribe" => Ok(FromFeedWebsocket::Subscribe { chain }),
"send-finality" => Ok(FromFeedWebsocket::SendFinality),
"no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality),
_ => return Err(anyhow::anyhow!("Command {} not recognised", cmd))
}
}
}
/// The aggregator can these messages back to a feed connection.
#[derive(Debug)]
pub enum ToFeedWebsocket {
Bytes(Vec<u8>)
}
#[derive(Clone)]
pub struct Aggregator(Arc<AggregatorInternal>);
struct AggregatorInternal {
/// Shards that connect are each assigned a unique connection ID.
/// This helps us know who to send messages back to (especially in
/// conjunction with the [`LocalId`] that messages will come with).
shard_conn_id: AtomicU64,
/// Feeds that connect have their own unique connection ID, too.
feed_conn_id: AtomicU64,
/// Send messages in to the aggregator from the outside via this. This is
/// stored here so that anybody holding an `Aggregator` handle can
/// make use of it.
tx_to_aggregator: mpsc::Sender<ToAggregator>
}
impl Aggregator {
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(denylist: Vec<String>) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = mpsc::channel(10);
// Handle any incoming messages in our handler loop:
tokio::spawn(Aggregator::handle_messages(rx_from_external, denylist));
// Return a handle to our aggregator:
Ok(Aggregator(Arc::new(AggregatorInternal {
shard_conn_id: AtomicU64::new(1),
feed_conn_id: AtomicU64::new(1),
tx_to_aggregator,
})))
}
// This is spawned into a separate task and handles any messages coming
// in to the aggregator. If nobody is tolding the tx side of the channel
// any more, this task will gracefully end.
async fn handle_messages(mut rx_from_external: mpsc::Receiver<ToAggregator>, denylist: Vec<String>) {
let mut node_state = State::new(denylist);
// Maintain mappings from the shard connection ID and local ID of messages to a global ID
// that uniquely identifies nodes in our node state.
let mut global_ids: BiMap<GlobalId, (u64, LocalId)> = BiMap::new();
// Keep track of channels to communicate with feeds and shards:
let mut feed_channels = HashMap::new();
let mut shard_channels = HashMap::new();
// What chains have our feeds subscribed to (one at a time at the mo)?
// Both of these need to be kept in sync (should move to own struct eventually).
let mut feed_conn_id_to_chain: HashMap<ConnId, Box<str>> = HashMap::new();
let mut chain_to_feed_conn_ids: HashMap<Box<str>, HashSet<ConnId>> = HashMap::new();
// Which feeds want finality info too?
let mut feed_conn_id_finality: HashSet<ConnId> = HashSet::new();
// Now, loop and receive messages to handle.
while let Some(msg) = rx_from_external.next().await {
match msg {
// FROM FEED
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Initialize { mut channel }) => {
feed_channels.insert(feed_conn_id, channel.clone());
// Tell the new feed subscription some basic things to get it going:
let mut feed_serializer = FeedMessageSerializer::new();
feed_serializer.push(feed_message::Version(31));
for chain in node_state.iter_chains() {
feed_serializer.push(feed_message::AddedChain(
chain.label(),
chain.node_count()
));
}
// Send this to the channel that subscribed:
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
},
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Ping { chain }) => {
let feed_channel = match feed_channels.get_mut(&feed_conn_id) {
Some(chan) => chan,
None => continue
};
// Pong!
let mut feed_serializer = FeedMessageSerializer::new();
feed_serializer.push(feed_message::Pong(&chain));
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
},
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Subscribe { chain }) => {
let feed_channel = match feed_channels.get_mut(&feed_conn_id) {
Some(chan) => chan,
None => continue
};
// Unsubscribe from previous chain if subscribed to one:
let old_chain_label = feed_conn_id_to_chain.remove(&feed_conn_id);
if let Some(old_chain_label) = &old_chain_label {
if let Some(map) = chain_to_feed_conn_ids.get_mut(old_chain_label) {
map.remove(&feed_conn_id);
}
}
// Untoggle request for finality feeds:
feed_conn_id_finality.remove(&feed_conn_id);
// Get the chain we're subscribing to, ignoring the rest if it doesn't exist.
let chain = match node_state.get_chain_by_label(&chain) {
Some(chain) => chain,
None => continue
};
// Send messages to the feed about the new chain:
let mut feed_serializer = FeedMessageSerializer::new();
if let Some(old_chain_label) = old_chain_label {
feed_serializer.push(feed_message::UnsubscribedFrom(&old_chain_label));
}
feed_serializer.push(feed_message::SubscribedTo(chain.label()));
feed_serializer.push(feed_message::TimeSync(now()));
feed_serializer.push(feed_message::BestBlock (
chain.best_block().height,
chain.timestamp(),
chain.average_block_time()
));
feed_serializer.push(feed_message::BestFinalized (
chain.finalized_block().height,
chain.finalized_block().hash
));
for (idx, (gid, node)) in node_state.get_nodes_in_chain(chain).enumerate() {
// Send subscription confirmation and chain head before doing all the nodes,
// and continue sending batches of 32 nodes a time over the wire subsequently
if idx % 32 == 0 {
if let Some(bytes) = feed_serializer.finalize() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
}
feed_serializer.push(feed_message::AddedNode(gid, node));
feed_serializer.push(feed_message::FinalizedBlock(
gid,
node.finalized().height,
node.finalized().hash,
));
if node.stale() {
feed_serializer.push(feed_message::StaleNode(gid));
}
}
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
// Actually make a note of the new chain subsciption:
feed_conn_id_to_chain.insert(feed_conn_id, chain.label().into());
chain_to_feed_conn_ids.entry(chain.label().into()).or_default().insert(feed_conn_id);
},
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::SendFinality) => {
feed_conn_id_finality.insert(feed_conn_id);
},
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::NoMoreFinality) => {
feed_conn_id_finality.remove(&feed_conn_id);
},
ToAggregator::FromFeedWebsocket(feed_conn_id, FromFeedWebsocket::Disconnected) => {
// The feed has disconnected; clean up references to it:
if let Some(chain) = feed_conn_id_to_chain.remove(&feed_conn_id) {
chain_to_feed_conn_ids.remove(&chain);
}
feed_channels.remove(&feed_conn_id);
feed_conn_id_finality.remove(&feed_conn_id);
},
// FROM SHARD
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Initialize { channel }) => {
shard_channels.insert(shard_conn_id, channel);
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Add { local_id, ip, node, genesis_hash }) => {
// Get globalId from add_node and store that against shard/local_id.
// TODO: node_state.add_node. Every feed should know about node count changes.
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Remove { local_id }) => {
if let Some(id) = global_ids.remove_by_right(&(shard_conn_id, local_id)) {
// TODO: node_state.remove_node, Every feed should know about node count changes.
}
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Update { local_id, payload }) => {
// TODO: Fill this all in...
let global_node_id = match global_ids.get_by_right(&(shard_conn_id, local_id)) {
Some(id) => id,
None => continue
};
if let Some(block) = payload.best_block() {
}
match payload {
node::Payload::SystemInterval(system_interval) => {
},
node::Payload::AfgAuthoritySet(_) => {
},
node::Payload::AfgFinalized(_) => {
},
node::Payload::AfgReceivedPrecommit(_) => {
},
node::Payload::AfgReceivedPrevote(_) => {
},
// This message should have been handled before the payload made it this far:
node::Payload::SystemConnected(_) => {
unreachable!("SystemConnected message seen in Telemetry Core, but should have been handled in shard");
},
// The following messages aren't handled at the moment. List them explicitly so
// that we have to make an explicit choice for any new messages:
node::Payload::BlockImport(_) |
node::Payload::NotifyFinalized(_) |
node::Payload::AfgReceivedCommit(_) |
node::Payload::TxPoolImport |
node::Payload::AfgFinalizedBlocksUpTo |
node::Payload::AuraPreSealedBlock |
node::Payload::PreparedBlockForProposing => {},
}
// TODO: node_state.update_node, then handle returned diffs
},
ToAggregator::FromShardWebsocket(shard_conn_id, FromShardWebsocket::Disconnected) => {
// The shard has disconnected; remove the shard channel, but also
// remove any nodes associated with the shard, firing the relevant feed messages.
}
}
}
}
/// Return a sink that a shard can send messages into to be handled by the aggregator.
pub fn subscribe_shard(&self) -> impl Sink<FromShardWebsocket, Error = anyhow::Error> + Unpin {
// Assign a unique aggregator-local ID to each connection that subscribes, and pass
// that along with every message to the aggregator loop:
let shard_conn_id: ConnId = self.0.shard_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tx_to_aggregator = self.0.tx_to_aggregator.clone();
// Calling `send` on this Sink requires Unpin. There may be a nicer way than this,
// but pinning by boxing is the easy solution for now:
Box::pin(tx_to_aggregator.with(move |msg| async move {
Ok(ToAggregator::FromShardWebsocket(shard_conn_id, msg))
}))
}
/// Return a sink that a feed can send messages into to be handled by the aggregator.
pub fn subscribe_feed(&self) -> impl Sink<FromFeedWebsocket, Error = anyhow::Error> + Unpin {
// Assign a unique aggregator-local ID to each connection that subscribes, and pass
// that along with every message to the aggregator loop:
let feed_conn_id: ConnId = self.0.feed_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tx_to_aggregator = self.0.tx_to_aggregator.clone();
// Calling `send` on this Sink requires Unpin. There may be a nicer way than this,
// but pinning by boxing is the easy solution for now:
Box::pin(tx_to_aggregator.with(move |msg| async move {
Ok(ToAggregator::FromFeedWebsocket(feed_conn_id, msg))
}))
}
}
@@ -0,0 +1,83 @@
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use futures::channel::mpsc;
use futures::{ Sink, SinkExt };
use super::inner_loop;
/// A unique Id is assigned per websocket connection (or more accurately,
/// per feed socket and per shard socket). This can be combined with the
/// [`LocalId`] of messages to give us a global ID.
type ConnId = u64;
#[derive(Clone)]
pub struct Aggregator(Arc<AggregatorInternal>);
struct AggregatorInternal {
/// Shards that connect are each assigned a unique connection ID.
/// This helps us know who to send messages back to (especially in
/// conjunction with the [`LocalId`] that messages will come with).
shard_conn_id: AtomicU64,
/// Feeds that connect have their own unique connection ID, too.
feed_conn_id: AtomicU64,
/// Send messages in to the aggregator from the outside via this. This is
/// stored here so that anybody holding an `Aggregator` handle can
/// make use of it.
tx_to_aggregator: mpsc::Sender<inner_loop::ToAggregator>
}
impl Aggregator {
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(denylist: Vec<String>) -> anyhow::Result<Aggregator> {
let (tx_to_aggregator, rx_from_external) = mpsc::channel(10);
// Handle any incoming messages in our handler loop:
tokio::spawn(Aggregator::handle_messages(rx_from_external, tx_to_aggregator.clone(), denylist));
// Return a handle to our aggregator:
Ok(Aggregator(Arc::new(AggregatorInternal {
shard_conn_id: AtomicU64::new(1),
feed_conn_id: AtomicU64::new(1),
tx_to_aggregator,
})))
}
// This is spawned into a separate task and handles any messages coming
// in to the aggregator. If nobody is tolding the tx side of the channel
// any more, this task will gracefully end.
async fn handle_messages(
rx_from_external: mpsc::Receiver<inner_loop::ToAggregator>,
tx_to_aggregator: mpsc::Sender<inner_loop::ToAggregator>,
denylist: Vec<String>
) {
inner_loop::InnerLoop::new(rx_from_external, tx_to_aggregator, denylist).handle().await;
}
/// Return a sink that a shard can send messages into to be handled by the aggregator.
pub fn subscribe_shard(&self) -> impl Sink<inner_loop::FromShardWebsocket, Error = anyhow::Error> + Unpin {
// Assign a unique aggregator-local ID to each connection that subscribes, and pass
// that along with every message to the aggregator loop:
let shard_conn_id: ConnId = self.0.shard_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tx_to_aggregator = self.0.tx_to_aggregator.clone();
// Calling `send` on this Sink requires Unpin. There may be a nicer way than this,
// but pinning by boxing is the easy solution for now:
Box::pin(tx_to_aggregator.with(move |msg| async move {
Ok(inner_loop::ToAggregator::FromShardWebsocket(shard_conn_id, msg))
}))
}
/// Return a sink that a feed can send messages into to be handled by the aggregator.
pub fn subscribe_feed(&self) -> impl Sink<inner_loop::FromFeedWebsocket, Error = anyhow::Error> + Unpin {
// Assign a unique aggregator-local ID to each connection that subscribes, and pass
// that along with every message to the aggregator loop:
let feed_conn_id: ConnId = self.0.feed_conn_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tx_to_aggregator = self.0.tx_to_aggregator.clone();
// Calling `send` on this Sink requires Unpin. There may be a nicer way than this,
// but pinning by boxing is the easy solution for now:
Box::pin(tx_to_aggregator.with(move |msg| async move {
Ok(inner_loop::ToAggregator::FromFeedWebsocket(feed_conn_id, msg))
}))
}
}
@@ -0,0 +1,206 @@
use std::net::Ipv4Addr;
use std::sync::Arc;
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use serde::Deserialize;
use futures::{Sink, SinkExt, StreamExt};
use futures::channel::mpsc;
use common::types::NodeLocation;
use tokio::sync::Semaphore;
/// The returned location is optional; it may be None if not found.
pub type Location = Option<Arc<NodeLocation>>;
/// This is responsible for taking an IP address and attempting
/// to find a geographical location from this
pub fn find_location<Id, R>(response_chan: R) -> mpsc::UnboundedSender<(Id, Ipv4Addr)>
where
R: Sink<(Id, Option<Arc<NodeLocation>>)> + Unpin + Send + Clone + 'static,
Id: Clone + Send + 'static
{
let (tx, mut rx) = mpsc::unbounded();
// cache entries
let mut cache: FxHashMap<Ipv4Addr, Option<Arc<NodeLocation>>> = FxHashMap::default();
// Default entry for localhost
cache.insert(
Ipv4Addr::new(127, 0, 0, 1),
Some(Arc::new(NodeLocation {
latitude: 52.516_6667,
longitude: 13.4,
city: "Berlin".into(),
})),
);
// Create a locator with our cache. This is used to obtain locations.
let locator = Locator::new(cache);
// Spawn a loop to handle location requests
tokio::spawn(async move {
// Allow 4 requests at a time. acquiring a token will block while the
// number of concurrent location requests is more than this.
let semaphore = Arc::new(Semaphore::new(4));
loop {
while let Some((id, ip_address)) = rx.next().await {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let mut response_chan = response_chan.clone();
let locator = locator.clone();
// Once we have acquired our permit, spawn a task to avoid
// blocking this loop so that we can handle concurrent requests.
tokio::spawn(async move {
match locator.locate(ip_address).await {
Ok(loc) => {
let _ = response_chan.send((id,loc)).await;
},
Err(e) => {
log::debug!("GET error for ip location: {:?}", e);
}
};
// ensure permit is moved into task by dropping it explicitly:
drop(permit);
});
}
}
});
tx
}
/// This struct can be used to make location requests, given
/// an IPV4 address.
#[derive(Clone)]
struct Locator {
client: reqwest::Client,
cache: Arc<RwLock<FxHashMap<Ipv4Addr, Option<Arc<NodeLocation>>>>>,
}
impl Locator {
pub fn new(cache: FxHashMap<Ipv4Addr, Option<Arc<NodeLocation>>>) -> Self {
let client = reqwest::Client::new();
Locator {
client,
cache: Arc::new(RwLock::new(cache))
}
}
pub async fn locate(&self, ip: Ipv4Addr) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
// Return location quickly if it's cached:
let cached_loc = {
let cache_reader = self.cache.read();
cache_reader.get(&ip).map(|o| o.clone())
};
if let Some(loc) = cached_loc {
return Ok(loc);
}
// Look it up via the location services if not cached:
let location = self.iplocate_ipapi_co(ip).await?;
let location = match location {
Some(location) => Ok(Some(location)),
None => self.iplocate_ipinfo_io(ip).await,
}?;
self.cache.write().insert(ip, location.clone());
Ok(location)
}
async fn iplocate_ipapi_co(&self, ip: Ipv4Addr) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
let location = self
.query(&format!("https://ipapi.co/{}/json", ip))
.await?
.map(Arc::new);
Ok(location)
}
async fn iplocate_ipinfo_io(&self, ip: Ipv4Addr) -> Result<Option<Arc<NodeLocation>>, reqwest::Error> {
let location = self
.query(&format!("https://ipinfo.io/{}/json", ip))
.await?
.and_then(|loc: IPApiLocate| loc.into_node_location().map(Arc::new));
Ok(location)
}
async fn query<T>(&self, url: &str) -> Result<Option<T>, reqwest::Error>
where for<'de> T: Deserialize<'de>
{
match self.client.get(url).send().await?.json::<T>().await {
Ok(result) => Ok(Some(result)),
Err(err) => {
log::debug!("JSON error for ip location: {:?}", err);
Ok(None)
}
}
}
}
/// This is the format returned from ipinfo.co, so we do
/// a little conversion to get it into the shape we want.
#[derive(Deserialize)]
struct IPApiLocate {
city: Box<str>,
loc: Box<str>,
}
impl IPApiLocate {
fn into_node_location(self) -> Option<NodeLocation> {
let IPApiLocate { city, loc } = self;
let mut loc = loc.split(',').map(|n| n.parse());
let latitude = loc.next()?.ok()?;
let longitude = loc.next()?.ok()?;
// Guarantee that the iterator has been exhausted
if loc.next().is_some() {
return None;
}
Some(NodeLocation {
latitude,
longitude,
city,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ipapi_locate_to_node_location() {
let ipapi = IPApiLocate {
loc: "12.5,56.25".into(),
city: "Foobar".into(),
};
let location = ipapi.into_node_location().unwrap();
assert_eq!(location.latitude, 12.5);
assert_eq!(location.longitude, 56.25);
assert_eq!(&*location.city, "Foobar");
}
#[test]
fn ipapi_locate_to_node_location_too_many() {
let ipapi = IPApiLocate {
loc: "12.5,56.25,1.0".into(),
city: "Foobar".into(),
};
let location = ipapi.into_node_location();
assert!(location.is_none());
}
}
@@ -0,0 +1,423 @@
use common::{
internal_messages::{
self,
LocalId,
MuteReason
},
node,
util::now
};
use bimap::BiMap;
use std::{iter::FromIterator, net::Ipv4Addr, str::FromStr};
use futures::channel::{ mpsc };
use futures::{ future, SinkExt, StreamExt };
use std::collections::{ HashMap, HashSet };
use crate::state::{ self, State, NodeId };
use crate::feed_message::{ self, FeedMessageSerializer };
use super::find_location::{ self, find_location };
/// A unique Id is assigned per websocket connection (or more accurately,
/// per feed socket and per shard socket). This can be combined with the
/// [`LocalId`] of messages to give us a global ID.
type ConnId = u64;
/// Incoming messages come via subscriptions, and end up looking like this.
#[derive(Clone,Debug)]
pub enum ToAggregator {
FromShardWebsocket(ConnId, FromShardWebsocket),
FromFeedWebsocket(ConnId, FromFeedWebsocket),
FromFindLocation(NodeId, find_location::Location)
}
/// An incoming shard connection can send these messages to the aggregator.
#[derive(Clone,Debug)]
pub enum FromShardWebsocket {
/// When the socket is opened, it'll send this first
/// so that we have a way to communicate back to it.
Initialize {
channel: mpsc::Sender<ToShardWebsocket>,
},
/// Tell the aggregator about a new node.
Add {
local_id: LocalId,
ip: Option<std::net::IpAddr>,
node: common::types::NodeDetails,
genesis_hash: common::types::BlockHash
},
/// Update/pass through details about a node.
Update {
local_id: LocalId,
payload: node::Payload
},
/// Tell the aggregator that a node has been removed when it disconnects.
Remove {
local_id: LocalId,
},
/// The shard is disconnected.
Disconnected
}
/// The aggregator can these messages back to a shard connection.
#[derive(Debug)]
pub enum ToShardWebsocket {
/// Mute messages to the core by passing the shard-local ID of them.
Mute {
local_id: LocalId,
reason: internal_messages::MuteReason
}
}
/// An incoming feed connection can send these messages to the aggregator.
#[derive(Clone,Debug)]
pub enum FromFeedWebsocket {
/// When the socket is opened, it'll send this first
/// so that we have a way to communicate back to it.
/// Unbounded so that slow feeds don't block aggregato
/// progress.
Initialize {
channel: mpsc::UnboundedSender<ToFeedWebsocket>,
},
/// The feed can subscribe to a chain to receive
/// messages relating to it.
Subscribe {
chain: Box<str>
},
/// The feed wants finality info for the chain, too.
SendFinality,
/// The feed doesn't want any more finality info for the chain.
NoMoreFinality,
/// An explicit ping message.
Ping {
chain: Box<str>
},
/// The feed is disconnected.
Disconnected
}
// The frontend sends text based commands; parse them into these messages:
impl FromStr for FromFeedWebsocket {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (cmd, chain) = match s.find(':') {
Some(idx) => (&s[..idx], s[idx+1..].into()),
None => return Err(anyhow::anyhow!("Expecting format `CMD:CHAIN_NAME`"))
};
match cmd {
"ping" => Ok(FromFeedWebsocket::Ping { chain }),
"subscribe" => Ok(FromFeedWebsocket::Subscribe { chain }),
"send-finality" => Ok(FromFeedWebsocket::SendFinality),
"no-more-finality" => Ok(FromFeedWebsocket::NoMoreFinality),
_ => return Err(anyhow::anyhow!("Command {} not recognised", cmd))
}
}
}
/// The aggregator can these messages back to a feed connection.
#[derive(Clone,Debug)]
pub enum ToFeedWebsocket {
Bytes(Vec<u8>)
}
/// Instances of this are responsible for handling incoming and
/// outgoing messages in the main aggregator loop.
pub struct InnerLoop {
/// Messages from the outside world come into this:
rx_from_external: mpsc::Receiver<ToAggregator>,
/// The state of our chains and nodes lives here:
node_state: State,
/// We maintain a mapping between NodeId and ConnId+LocalId, so that we know
/// which messages are about which nodes.
node_ids: BiMap<NodeId, (ConnId, LocalId)>,
/// Keep track of how to send messages out to feeds.
feed_channels: HashMap<ConnId, mpsc::UnboundedSender<ToFeedWebsocket>>,
/// Keep track of how to send messages out to shards.
shard_channels: HashMap<ConnId, mpsc::Sender<ToShardWebsocket>>,
/// Which chain is a feed subscribed to?
feed_conn_id_to_chain: HashMap<ConnId, Box<str>>,
/// Which feeds are subscribed to a given chain (needs to stay in sync with above)?
chain_to_feed_conn_ids: HashMap<Box<str>, HashSet<ConnId>>,
/// These feeds want finality info, too.
feed_conn_id_finality: HashSet<ConnId>,
/// Send messages here to make location requests, which are sent back into the loop.
tx_to_locator: mpsc::UnboundedSender<(NodeId, Ipv4Addr)>
}
impl InnerLoop {
/// Create a new inner loop handler with the various state it needs.
pub fn new(
rx_from_external: mpsc::Receiver<ToAggregator>,
tx_to_aggregator: mpsc::Sender<ToAggregator>,
denylist: Vec<String>
) -> Self {
let tx_to_locator = find_location(tx_to_aggregator.with(|(node_id, msg)| {
future::ok::<_,mpsc::SendError>(ToAggregator::FromFindLocation(node_id, msg))
}));
InnerLoop {
rx_from_external,
node_state: State::new(denylist),
node_ids: BiMap::new(),
feed_channels: HashMap::new(),
shard_channels: HashMap::new(),
feed_conn_id_to_chain: HashMap::new(),
chain_to_feed_conn_ids: HashMap::new(),
feed_conn_id_finality: HashSet::new(),
tx_to_locator
}
}
/// Start handling and responding to incoming messages.
pub async fn handle(mut self) {
while let Some(msg) = self.rx_from_external.next().await {
match msg {
ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => {
self.handle_from_feed(feed_conn_id, msg).await
},
ToAggregator::FromShardWebsocket(shard_conn_id, msg) => {
self.handle_from_shard(shard_conn_id, msg).await
},
ToAggregator::FromFindLocation(node_id, location) => {
self.handle_from_find_location(node_id, location).await
}
}
}
}
async fn handle_from_find_location(&mut self, node_id: NodeId, location: find_location::Location) {
// TODO: Update node location here
}
/// Handle messages coming from shards.
async fn handle_from_shard(&mut self, shard_conn_id: ConnId, msg: FromShardWebsocket) {
match msg {
FromShardWebsocket::Initialize { channel } => {
self.shard_channels.insert(shard_conn_id, channel);
},
FromShardWebsocket::Add { local_id, ip, node, genesis_hash } => {
match self.node_state.add_node(genesis_hash, node) {
state::AddNodeResult::ChainOnDenyList => {
if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) {
let _ = shard_conn.send(ToShardWebsocket::Mute {
local_id,
reason: MuteReason::ChainNotAllowed
}).await;
}
},
state::AddNodeResult::ChainOverQuota => {
if let Some(shard_conn) = self.shard_channels.get_mut(&shard_conn_id) {
let _ = shard_conn.send(ToShardWebsocket::Mute {
local_id,
reason: MuteReason::Overquota
}).await;
}
},
state::AddNodeResult::NodeAddedToChain(details) => {
let node_id = details.id;
// Note the ID so that we know what node other messages are referring to:
self.node_ids.insert(node_id, (shard_conn_id, local_id));
let mut feed_serializer = FeedMessageSerializer::new();
feed_serializer.push(feed_message::AddedNode(node_id, details.node));
let chain_label = details.chain.label().to_owned();
if let Some(bytes) = feed_serializer.into_finalized() {
self.broadcast_to_chain_feeds(
&chain_label,
ToFeedWebsocket::Bytes(bytes)
).await
}
// TODO: The node has been added. use it's IP to find a location.
},
}
},
FromShardWebsocket::Remove { local_id } => {
if let Some(node_id) = self.node_ids.remove_by_right(&(shard_conn_id, local_id)) {
// TODO: node_state.remove_node, Every feed should know about node count changes.
}
},
FromShardWebsocket::Update { local_id, payload } => {
// TODO: Fill this all in...
let node_id = match self.node_ids.get_by_right(&(shard_conn_id, local_id)) {
Some(id) => id,
None => return
};
if let Some(block) = payload.best_block() {
}
match payload {
node::Payload::SystemInterval(system_interval) => {
},
node::Payload::AfgAuthoritySet(_) => {
},
node::Payload::AfgFinalized(_) => {
},
node::Payload::AfgReceivedPrecommit(_) => {
},
node::Payload::AfgReceivedPrevote(_) => {
},
// This message should have been handled before the payload made it this far:
node::Payload::SystemConnected(_) => {
unreachable!("SystemConnected message seen in Telemetry Core, but should have been handled in shard");
},
// The following messages aren't handled at the moment. List them explicitly so
// that we have to make an explicit choice for any new messages:
node::Payload::BlockImport(_) |
node::Payload::NotifyFinalized(_) |
node::Payload::AfgReceivedCommit(_) |
node::Payload::TxPoolImport |
node::Payload::AfgFinalizedBlocksUpTo |
node::Payload::AuraPreSealedBlock |
node::Payload::PreparedBlockForProposing => {},
}
// TODO: node_state.update_node, then handle returned diffs
},
FromShardWebsocket::Disconnected => {
// The shard has disconnected; remove the shard channel, but also
// remove any nodes associated with the shard, firing the relevant feed messages.
}
}
}
/// Handle messages coming from feeds.
async fn handle_from_feed(&mut self, feed_conn_id: ConnId, msg: FromFeedWebsocket) {
match msg {
FromFeedWebsocket::Initialize { mut channel } => {
self.feed_channels.insert(feed_conn_id, channel.clone());
// Tell the new feed subscription some basic things to get it going:
let mut feed_serializer = FeedMessageSerializer::new();
feed_serializer.push(feed_message::Version(31));
for chain in self.node_state.iter_chains() {
feed_serializer.push(feed_message::AddedChain(
chain.label(),
chain.node_count()
));
}
// Send this to the channel that subscribed:
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
},
FromFeedWebsocket::Ping { chain } => {
let feed_channel = match self.feed_channels.get_mut(&feed_conn_id) {
Some(chan) => chan,
None => return
};
// Pong!
let mut feed_serializer = FeedMessageSerializer::new();
feed_serializer.push(feed_message::Pong(&chain));
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
},
FromFeedWebsocket::Subscribe { chain } => {
let feed_channel = match self.feed_channels.get_mut(&feed_conn_id) {
Some(chan) => chan,
None => return
};
// Unsubscribe from previous chain if subscribed to one:
let old_chain_label = self.feed_conn_id_to_chain.remove(&feed_conn_id);
if let Some(old_chain_label) = &old_chain_label {
if let Some(map) = self.chain_to_feed_conn_ids.get_mut(old_chain_label) {
map.remove(&feed_conn_id);
}
}
// Untoggle request for finality feeds:
self.feed_conn_id_finality.remove(&feed_conn_id);
// Get the chain we're subscribing to, ignoring the rest if it doesn't exist.
let chain = match self.node_state.get_chain_by_label(&chain) {
Some(chain) => chain,
None => return
};
// Send messages to the feed about the new chain:
let mut feed_serializer = FeedMessageSerializer::new();
if let Some(old_chain_label) = old_chain_label {
feed_serializer.push(feed_message::UnsubscribedFrom(&old_chain_label));
}
feed_serializer.push(feed_message::SubscribedTo(chain.label()));
feed_serializer.push(feed_message::TimeSync(now()));
feed_serializer.push(feed_message::BestBlock (
chain.best_block().height,
chain.timestamp(),
chain.average_block_time()
));
feed_serializer.push(feed_message::BestFinalized (
chain.finalized_block().height,
chain.finalized_block().hash
));
for (idx, (gid, node)) in chain.nodes().enumerate() {
// Send subscription confirmation and chain head before doing all the nodes,
// and continue sending batches of 32 nodes a time over the wire subsequently
if idx % 32 == 0 {
if let Some(bytes) = feed_serializer.finalize() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
}
feed_serializer.push(feed_message::AddedNode(gid, node));
feed_serializer.push(feed_message::FinalizedBlock(
gid,
node.finalized().height,
node.finalized().hash,
));
if node.stale() {
feed_serializer.push(feed_message::StaleNode(gid));
}
}
if let Some(bytes) = feed_serializer.into_finalized() {
let _ = feed_channel.send(ToFeedWebsocket::Bytes(bytes)).await;
}
// Actually make a note of the new chain subsciption:
self.feed_conn_id_to_chain.insert(feed_conn_id, chain.label().into());
self.chain_to_feed_conn_ids.entry(chain.label().into()).or_default().insert(feed_conn_id);
},
FromFeedWebsocket::SendFinality => {
self.feed_conn_id_finality.insert(feed_conn_id);
},
FromFeedWebsocket::NoMoreFinality => {
self.feed_conn_id_finality.remove(&feed_conn_id);
},
FromFeedWebsocket::Disconnected => {
// The feed has disconnected; clean up references to it:
if let Some(chain) = self.feed_conn_id_to_chain.remove(&feed_conn_id) {
self.chain_to_feed_conn_ids.remove(&chain);
}
self.feed_channels.remove(&feed_conn_id);
self.feed_conn_id_finality.remove(&feed_conn_id);
},
}
}
/// Send a message to all chain feeds.
async fn broadcast_to_chain_feeds(&mut self, chain: &str, message: ToFeedWebsocket) {
if let Some(feeds) = self.chain_to_feed_conn_ids.get(chain) {
for &feed_id in feeds {
// How much faster would it be if we processed these in parallel?
if let Some(chan) = self.feed_channels.get_mut(&feed_id) {
chan.send(message.clone()).await;
}
}
}
}
}
+8
View File
@@ -0,0 +1,8 @@
mod aggregator;
mod inner_loop;
mod find_location;
// Expose the various message types that can be worked with externally:
pub use inner_loop::{ FromFeedWebsocket, FromShardWebsocket, ToFeedWebsocket, ToShardWebsocket };
pub use aggregator::*;
+2 -2
View File
@@ -151,8 +151,8 @@ async fn handle_shard_websocket_connection<S>(mut websocket: ws::WebSocket, mut
};
let internal_msg = match msg {
ToShardWebsocket::Mute { local_id } => {
internal_messages::FromTelemetryCore::Mute { local_id }
ToShardWebsocket::Mute { local_id, reason } => {
internal_messages::FromTelemetryCore::Mute { local_id, reason }
}
};
+83 -17
View File
@@ -1,23 +1,25 @@
use std::sync::Arc;
use std::collections::{ HashSet, HashMap };
use common::types::{ BlockHash };
use common::internal_messages::{ GlobalId };
use super::node::Node;
use common::types::{Block, NodeDetails, NodeId, NodeLocation, Timestamp};
use common::types::{Block, NodeDetails, NodeLocation, Timestamp};
use common::util::{now, DenseMap, NumStats};
use common::most_seen::{ MostSeen, self };
use common::node::Payload;
use std::iter::IntoIterator;
use once_cell::sync::Lazy;
use super::node::Node;
use super::NodeId;
pub type ChainId = usize;
pub type Label = Arc<str>;
pub type Label = Box<str>;
pub struct Chain {
/// Label of this chain, along with count of nodes that use this label
label: (Label, usize),
/// Chain genesis hash
genesis_hash: BlockHash,
/// Labels that nodes use for this chain. We keep track of
/// the most commonly used label as nodes are added/removed.
labels: MostSeen<Label>,
/// Set of nodes that are in this chain
nodes: HashSet<GlobalId>,
nodes: HashMap<NodeId, Node>,
/// Best block
best: Block,
/// Finalized block
@@ -27,22 +29,75 @@ pub struct Chain {
/// Calculated average block time
average_block_time: Option<u64>,
/// When the best block first arrived
timestamp: Option<Timestamp>,
/// Some nodes might manifest a different label, note them here
labels: HashMap<Label, usize>,
/// How many nodes are allowed in this chain
max_nodes: usize
timestamp: Option<Timestamp>
}
pub enum AddNodeResult {
Overquota,
Added {
chain_renamed: bool
}
}
/// Labels of chains we consider "first party". These chains allow any
/// number of nodes to connect.
static FIRST_PARTY_NETWORKS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
let mut set = HashSet::new();
set.insert("Polkadot");
set.insert("Kusama");
set.insert("Westend");
set.insert("Rococo");
set
});
/// Max number of nodes allowed to connect to the telemetry server.
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
impl Chain {
/// Create a new chain with an initial label.
pub fn new(label: Label) -> Self {
Chain {
labels: MostSeen::new(label),
nodes: HashMap::new(),
best: Block::zero(),
finalized: Block::zero(),
block_times: NumStats::new(50),
average_block_time: None,
timestamp: None
}
}
/// Can we add a node? If not, it's because the chain is at its quota.
pub fn can_add_node(&self) -> bool {
// Dynamically determine the max nodes based on the most common
// label so far, in case it changes to something with a different limit.
self.nodes.len() < max_nodes(self.labels.best())
}
/// Assign a node to this chain. If the function returns false, it
/// means that the node could not be added as we're at quota.
pub fn add_node(&mut self, node_id: NodeId, node_details: NodeDetails) -> AddNodeResult {
if !self.can_add_node() {
return AddNodeResult::Overquota
}
let label_result = self.labels.insert(&node_details.chain);
let new_node = Node::new(node_details);
self.nodes.insert(node_id, new_node);
AddNodeResult::Added {
chain_renamed: label_result.has_changed()
}
}
pub fn get_node(&self, node_id: NodeId) -> Option<&Node> {
self.nodes.get(&node_id)
}
pub fn label(&self) -> &str {
&self.label.0
&self.labels.best()
}
pub fn node_count(&self) -> usize {
self.nodes.len()
}
pub fn node_ids(&self) -> impl Iterator<Item=GlobalId> + '_ {
self.nodes.iter().copied()
pub fn nodes(&self) -> impl Iterator<Item=(NodeId, &Node)> + '_ {
self.nodes.iter().map(|(id, node)| (*id, node))
}
pub fn best_block(&self) -> &Block {
&self.best
@@ -56,4 +111,15 @@ impl Chain {
pub fn finalized_block(&self) -> &Block {
&self.finalized
}
}
/// First party networks (Polkadot, Kusama etc) are allowed any number of nodes.
/// Third party networks are allowed `THIRD_PARTY_NETWORKS_MAX_NODES` nodes and
/// no more.
fn max_nodes(label: &str) -> usize {
if FIRST_PARTY_NETWORKS.contains(label) {
usize::MAX
} else {
THIRD_PARTY_NETWORKS_MAX_NODES
}
}
+2 -4
View File
@@ -1,9 +1,7 @@
mod node;
mod chain;
// mod feed_message;
// mod diff;
mod state;
pub use state::State;
pub use node::Node;
pub use node::Node;
pub use state::*;
+54 -48
View File
@@ -1,60 +1,48 @@
use std::sync::Arc;
use std::collections::{ HashSet, HashMap };
use common::types::{ BlockHash };
use common::internal_messages::{ GlobalId };
use super::node::Node;
use once_cell::sync::Lazy;
use common::types::{Block, NodeDetails, NodeId, NodeLocation, Timestamp};
use common::types::{Block, NodeDetails, NodeLocation, Timestamp};
use common::util::{now, DenseMap, NumStats};
use common::node::Payload;
use std::iter::IntoIterator;
use super::chain::Chain;
use super::chain::{ self, Chain };
pub type ChainId = usize;
pub type NodeId = usize;
pub type Label = Arc<str>;
/// Our state constains node and chain information
pub struct State {
chains: DenseMap<Chain>,
nodes: HashMap<GlobalId, Node>,
chains_by_genesis_hash: HashMap<BlockHash, ChainId>,
chains_by_label: HashMap<Label, ChainId>,
next_id: NodeId,
chains: HashMap<BlockHash, Chain>,
chains_by_label: HashMap<Label, BlockHash>,
chains_by_node: HashMap<NodeId, BlockHash>,
/// Denylist for networks we do not want to allow connecting.
denylist: HashSet<String>,
}
/// Labels of chains we consider "first party". These chains allow any
/// number of nodes to connect.
static FIRST_PARTY_NETWORKS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
let mut set = HashSet::new();
set.insert("Polkadot");
set.insert("Kusama");
set.insert("Westend");
set.insert("Rococo");
set
});
/// Max number of nodes allowed to connect to the telemetry server.
const THIRD_PARTY_NETWORKS_MAX_NODES: usize = 500;
/// Adding a node to a chain leads to this result:
pub enum AddNodeResult {
pub enum AddNodeResult<'a> {
/// The chain is on the "deny list", so we can't add the node
ChainOnDenyList,
/// The chain is over quota (too many nodes connected), so can't add the node
ChainOverQuota,
/// The node was added to the chain
NodeAddedToChain(NodeAddedToChain)
NodeAddedToChain(NodeAddedToChain<'a>)
}
pub struct NodeAddedToChain {
/// The label for the chain (which may have changed as a result of adding the node):
chain_label: Arc<str>,
pub struct NodeAddedToChain<'a> {
/// The ID assigned to this node.
pub id: NodeId,
/// The chain the node was added to.
pub chain: &'a Chain,
/// The node that was added.
pub node: &'a Node,
/// Is this chain newly added?
pub chain_just_added: bool,
/// Has the chain label been updated?
has_chain_label_changed: bool,
// How many nodes now exist in the chain?
chain_node_count: usize
pub has_chain_label_changed: bool
}
pub struct RemoveNodeResult {
@@ -65,10 +53,10 @@ pub struct RemoveNodeResult {
impl State {
pub fn new<T: IntoIterator<Item=String>>(denylist: T) -> State {
State {
chains: DenseMap::new(),
nodes: HashMap::new(),
chains_by_genesis_hash: HashMap::new(),
next_id: 0,
chains: HashMap::new(),
chains_by_label: HashMap::new(),
chains_by_node: HashMap::new(),
denylist: denylist.into_iter().collect()
}
}
@@ -82,12 +70,40 @@ impl State {
pub fn get_chain_by_label(&self, label: &str) -> Option<&Chain> {
self.chains_by_label
.get(label)
.and_then(|chain_id| self.chains.get(*chain_id))
.and_then(|chain_id| self.chains.get(chain_id))
}
pub fn get_nodes_in_chain<'s>(&'s self, chain: &'s Chain) -> impl Iterator<Item=(GlobalId,&Node)> {
chain.node_ids()
.filter_map(move |id| self.nodes.get(&id).map(|node| (id, node)))
pub fn add_node(&mut self, genesis_hash: BlockHash, node_details: NodeDetails) -> AddNodeResult<'_> {
if self.denylist.contains(&*node_details.chain) {
return AddNodeResult::ChainOnDenyList;
}
let chain = self.chains
.entry(genesis_hash)
.or_insert_with(|| Chain::new(node_details.chain.clone()));
if !chain.can_add_node() {
return AddNodeResult::ChainOverQuota;
}
let node_id = self.next_id;
self.next_id += 1;
match chain.add_node(node_id, node_details) {
chain::AddNodeResult::Overquota => {
AddNodeResult::ChainOverQuota
},
chain::AddNodeResult::Added { chain_renamed } => {
let node = chain.get_node(node_id).unwrap();
AddNodeResult::NodeAddedToChain(NodeAddedToChain {
id: node_id,
chain: chain,
node: node,
chain_just_added: chain.node_count() == 1,
has_chain_label_changed: chain_renamed
})
}
}
}
// /// Add a new node to our state.
@@ -117,13 +133,3 @@ impl State {
// }
}
/// First party networks (Polkadot, Kusama etc) are allowed any number of nodes.
/// Third party networks are allowed `THIRD_PARTY_NETWORKS_MAX_NODES` nodes and
/// no more.
fn max_nodes(label: &str) -> usize {
if FIRST_PARTY_NETWORKS.contains(label) {
usize::MAX
} else {
THIRD_PARTY_NETWORKS_MAX_NODES
}
}