mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-04-29 21:57:57 +00:00
Aggregate chains by network_id (#234)
* Aggregate chains by network_id * Fix network_id handling
This commit is contained in:
+91
-23
@@ -10,6 +10,7 @@ use crate::types::{NodeDetails, NodeId};
|
||||
|
||||
pub struct Aggregator {
|
||||
labels: HashMap<Label, ChainId>,
|
||||
networks: HashMap<Label, ChainId>,
|
||||
chains: DenseMap<ChainEntry>,
|
||||
feeds: DenseMap<Addr<FeedConnector>>,
|
||||
serializer: FeedMessageSerializer,
|
||||
@@ -18,6 +19,7 @@ pub struct Aggregator {
|
||||
pub struct ChainEntry {
|
||||
addr: Addr<Chain>,
|
||||
label: Label,
|
||||
network_id: Option<Label>,
|
||||
nodes: usize,
|
||||
}
|
||||
|
||||
@@ -25,6 +27,7 @@ impl Aggregator {
|
||||
pub fn new() -> Self {
|
||||
Aggregator {
|
||||
labels: HashMap::new(),
|
||||
networks: HashMap::new(),
|
||||
chains: DenseMap::new(),
|
||||
feeds: DenseMap::new(),
|
||||
serializer: FeedMessageSerializer::new(),
|
||||
@@ -33,33 +36,52 @@ impl Aggregator {
|
||||
|
||||
/// Get an address to the chain actor by name. If the address is not found,
|
||||
/// or the address is disconnected (actor dropped), create a new one.
|
||||
pub fn lazy_chain(&mut self, label: Label, ctx: &mut <Self as Actor>::Context) -> &mut ChainEntry {
|
||||
let (cid, found) = self.labels
|
||||
.get(&label)
|
||||
.map(|&cid| (cid, true))
|
||||
.unwrap_or_else(|| {
|
||||
pub fn lazy_chain(
|
||||
&mut self,
|
||||
label: &str,
|
||||
network: &Option<Label>,
|
||||
ctx: &mut <Self as Actor>::Context,
|
||||
) -> ChainId {
|
||||
let cid = match self.get_chain_id(label, network.as_ref()) {
|
||||
Some(cid) => cid,
|
||||
None => {
|
||||
self.serializer.push(feed::AddedChain(&label, 1));
|
||||
|
||||
let addr = ctx.address();
|
||||
let label = label.clone();
|
||||
let cid = self.chains.add_with(move |cid| {
|
||||
let label: Label = label.into();
|
||||
let cid = self.chains.add_with(|cid| {
|
||||
ChainEntry {
|
||||
addr: Chain::new(cid, addr, label.clone()).start(),
|
||||
label,
|
||||
label: label.clone(),
|
||||
network_id: network.clone(),
|
||||
nodes: 1,
|
||||
}
|
||||
});
|
||||
|
||||
self.labels.insert(label, cid);
|
||||
|
||||
if let Some(network) = network {
|
||||
self.networks.insert(network.clone(), cid);
|
||||
}
|
||||
|
||||
self.broadcast();
|
||||
|
||||
(cid, false)
|
||||
});
|
||||
cid
|
||||
}
|
||||
};
|
||||
|
||||
if !found {
|
||||
self.labels.insert(label, cid);
|
||||
cid
|
||||
}
|
||||
|
||||
fn get_chain_id(&self, label: &str, network: Option<&Label>) -> Option<ChainId> {
|
||||
let labels = &self.labels;
|
||||
let networks = &self.networks;
|
||||
|
||||
if let Some(network) = network {
|
||||
networks.get(&**network).or_else(|| labels.get(label)).copied()
|
||||
} else {
|
||||
labels.get(label).copied()
|
||||
}
|
||||
|
||||
self.chains.get_mut(cid).expect("Entry just created above; qed")
|
||||
}
|
||||
|
||||
fn get_chain(&mut self, label: &str) -> Option<&mut ChainEntry> {
|
||||
@@ -84,13 +106,16 @@ impl Actor for Aggregator {
|
||||
#[derive(Message)]
|
||||
pub struct AddNode {
|
||||
pub node: NodeDetails,
|
||||
pub chain: Label,
|
||||
pub network_id: Option<Label>,
|
||||
pub rec: Recipient<Initialize>,
|
||||
}
|
||||
|
||||
/// Message sent from the Chain to the Aggregator when the Chain loses all nodes
|
||||
#[derive(Message)]
|
||||
pub struct DropChain(pub Label);
|
||||
pub struct DropChain(pub ChainId);
|
||||
|
||||
#[derive(Message)]
|
||||
pub struct RenameChain(pub ChainId, pub Label);
|
||||
|
||||
/// Message sent from the FeedConnector to the Aggregator when subscribing to a new chain
|
||||
pub struct Subscribe {
|
||||
@@ -146,9 +171,20 @@ impl Handler<AddNode> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
|
||||
let AddNode { node, chain, rec } = msg;
|
||||
let AddNode { node, network_id, rec } = msg;
|
||||
|
||||
self.lazy_chain(chain, ctx).addr.do_send(chain::AddNode {
|
||||
let cid = self.lazy_chain(&node.chain, &network_id, ctx);
|
||||
let chain = self.chains.get_mut(cid).expect("Entry just created above; qed");
|
||||
|
||||
if let Some(network_id) = network_id {
|
||||
// Attach network id to the chain if it was not done yet
|
||||
if chain.network_id.is_none() {
|
||||
chain.network_id = Some(network_id.clone());
|
||||
self.networks.insert(network_id, cid);
|
||||
}
|
||||
}
|
||||
|
||||
chain.addr.do_send(chain::AddNode {
|
||||
node,
|
||||
rec,
|
||||
});
|
||||
@@ -159,15 +195,47 @@ impl Handler<DropChain> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: DropChain, _: &mut Self::Context) {
|
||||
let DropChain(label) = msg;
|
||||
let DropChain(cid) = msg;
|
||||
|
||||
if let Some(cid) = self.labels.remove(&label) {
|
||||
self.chains.remove(cid);
|
||||
self.serializer.push(feed::RemovedChain(&label));
|
||||
if let Some(entry) = self.chains.remove(cid) {
|
||||
let label = &entry.label;
|
||||
self.labels.remove(label);
|
||||
if let Some(network) = entry.network_id {
|
||||
self.networks.remove(&network);
|
||||
}
|
||||
|
||||
self.serializer.push(feed::RemovedChain(label));
|
||||
info!("Dropped chain [{}] from the aggregator", label);
|
||||
self.broadcast();
|
||||
}
|
||||
|
||||
info!("Dropped chain [{}] from the aggregator", label);
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<RenameChain> for Aggregator {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: RenameChain, _: &mut Self::Context) {
|
||||
let RenameChain(cid, new) = msg;
|
||||
|
||||
if let Some(entry) = self.chains.get_mut(cid) {
|
||||
if entry.label == new {
|
||||
return;
|
||||
}
|
||||
|
||||
// Update UI
|
||||
self.serializer.push(feed::RemovedChain(&entry.label));
|
||||
self.serializer.push(feed::AddedChain(&new, entry.nodes));
|
||||
|
||||
// Update labels -> cid map
|
||||
self.labels.remove(&entry.label);
|
||||
self.labels.insert(new.clone(), cid);
|
||||
|
||||
// Update entry
|
||||
entry.label = new;
|
||||
|
||||
self.broadcast();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+64
-10
@@ -1,9 +1,10 @@
|
||||
use std::collections::HashMap;
|
||||
use actix::prelude::*;
|
||||
use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use rustc_hash::FxHashMap;
|
||||
|
||||
use crate::aggregator::{Aggregator, DropChain, NodeCount};
|
||||
use crate::aggregator::{Aggregator, DropChain, RenameChain, NodeCount};
|
||||
use crate::node::{Node, connector::Initialize, message::{NodeMessage, Details}};
|
||||
use crate::feed::connector::{FeedId, FeedConnector, Subscribed, Unsubscribed};
|
||||
use crate::feed::{self, FeedMessageSerializer};
|
||||
@@ -19,8 +20,8 @@ pub struct Chain {
|
||||
cid: ChainId,
|
||||
/// Who to inform if we Chain drops itself
|
||||
aggregator: Addr<Aggregator>,
|
||||
/// Label of this chain
|
||||
label: Label,
|
||||
/// Label of this chain, along with count of nodes that use this label
|
||||
label: (Label, usize),
|
||||
/// Dense mapping of NodeId -> Node
|
||||
nodes: DenseMap<Node>,
|
||||
/// Dense mapping of FeedId -> Addr<FeedConnector>,
|
||||
@@ -39,6 +40,8 @@ pub struct Chain {
|
||||
serializer: FeedMessageSerializer,
|
||||
/// When the best block first arrived
|
||||
timestamp: Option<Timestamp>,
|
||||
/// Some nodes might manifest a different label, note them here
|
||||
labels: HashMap<Label, usize>,
|
||||
}
|
||||
|
||||
impl Chain {
|
||||
@@ -48,7 +51,7 @@ impl Chain {
|
||||
Chain {
|
||||
cid,
|
||||
aggregator,
|
||||
label,
|
||||
label: (label, 0),
|
||||
nodes: DenseMap::new(),
|
||||
feeds: DenseMap::new(),
|
||||
finality_feeds: FxHashMap::default(),
|
||||
@@ -58,9 +61,56 @@ impl Chain {
|
||||
average_block_time: None,
|
||||
serializer: FeedMessageSerializer::new(),
|
||||
timestamp: None,
|
||||
labels: HashMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn increment_label_count(&mut self, label: &str) {
|
||||
let count = match self.labels.get_mut(label) {
|
||||
Some(count) => {
|
||||
*count += 1;
|
||||
*count
|
||||
},
|
||||
None => {
|
||||
self.labels.insert(label.into(), 1);
|
||||
1
|
||||
},
|
||||
};
|
||||
|
||||
if &*self.label.0 == label {
|
||||
self.label.1 += 1;
|
||||
} else {
|
||||
if count > self.label.1 {
|
||||
self.rename(label.into(), count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn decrement_label_count(&mut self, label: &str) {
|
||||
match self.labels.get_mut(label) {
|
||||
Some(count) => *count -= 1,
|
||||
None => return,
|
||||
};
|
||||
|
||||
if &*self.label.0 == label {
|
||||
self.label.1 -= 1;
|
||||
|
||||
for (label, &count) in self.labels.iter() {
|
||||
if count > self.label.1 {
|
||||
let label: Arc<_> = label.clone();
|
||||
self.rename(label, count);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn rename(&mut self, label: Label, count: usize) {
|
||||
self.label = (label, count);
|
||||
|
||||
self.aggregator.do_send(RenameChain(self.cid, self.label.0.clone()));
|
||||
}
|
||||
|
||||
fn broadcast(&mut self) {
|
||||
if let Some(msg) = self.serializer.finalize() {
|
||||
for (_, feed) in self.feeds.iter() {
|
||||
@@ -132,7 +182,7 @@ impl Actor for Chain {
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn stopped(&mut self, _: &mut Self::Context) {
|
||||
self.aggregator.do_send(DropChain(self.label.clone()));
|
||||
self.aggregator.do_send(DropChain(self.cid));
|
||||
|
||||
for (_, feed) in self.feeds.iter() {
|
||||
feed.do_send(Unsubscribed)
|
||||
@@ -190,6 +240,8 @@ impl Handler<AddNode> for Chain {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: AddNode, ctx: &mut Self::Context) {
|
||||
self.increment_label_count(&msg.node.chain);
|
||||
|
||||
let nid = self.nodes.add(Node::new(msg.node));
|
||||
|
||||
if let Err(_) = msg.rec.do_send(Initialize(nid, ctx.address())) {
|
||||
@@ -221,7 +273,7 @@ impl Chain {
|
||||
self.best = *block;
|
||||
info!(
|
||||
"[{}] [{}/{}] new best block ({}) {:?}",
|
||||
self.label,
|
||||
self.label.0,
|
||||
nodes_len,
|
||||
self.feeds.len(),
|
||||
self.best.height,
|
||||
@@ -361,10 +413,12 @@ impl Handler<RemoveNode> for Chain {
|
||||
fn handle(&mut self, msg: RemoveNode, ctx: &mut Self::Context) {
|
||||
let RemoveNode(nid) = msg;
|
||||
|
||||
self.nodes.remove(nid);
|
||||
if let Some(node) = self.nodes.remove(nid) {
|
||||
self.decrement_label_count(&node.details().chain);
|
||||
}
|
||||
|
||||
if self.nodes.is_empty() {
|
||||
info!("[{}] Lost all nodes, dropping...", self.label);
|
||||
info!("[{}] Lost all nodes, dropping...", self.label.0);
|
||||
ctx.stop();
|
||||
}
|
||||
|
||||
@@ -383,7 +437,7 @@ impl Handler<Subscribe> for Chain {
|
||||
|
||||
feed.do_send(Subscribed(fid, ctx.address().recipient()));
|
||||
|
||||
self.serializer.push(feed::SubscribedTo(&self.label));
|
||||
self.serializer.push(feed::SubscribedTo(&self.label.0));
|
||||
self.serializer.push(feed::TimeSync(now()));
|
||||
self.serializer.push(feed::BestBlock(
|
||||
self.best.height,
|
||||
@@ -445,7 +499,7 @@ impl Handler<Unsubscribe> for Chain {
|
||||
let Unsubscribe(fid) = msg;
|
||||
|
||||
if let Some(feed) = self.feeds.get(fid) {
|
||||
self.serializer.push(feed::UnsubscribedFrom(&self.label));
|
||||
self.serializer.push(feed::UnsubscribedFrom(&self.label.0));
|
||||
|
||||
if let Some(serialized) = self.serializer.finalize() {
|
||||
feed.do_send(serialized);
|
||||
|
||||
@@ -82,11 +82,11 @@ impl NodeConnector {
|
||||
}
|
||||
|
||||
if let Details::SystemConnected(connected) = msg.details {
|
||||
let SystemConnected { chain, node } = connected;
|
||||
let SystemConnected { network_id, node } = connected;
|
||||
let rec = ctx.address().recipient();
|
||||
let chain = chain.into();
|
||||
let network_id = network_id.map(Into::into);
|
||||
|
||||
self.aggregator.do_send(AddNode { rec, chain, node });
|
||||
self.aggregator.do_send(AddNode { rec, network_id, node });
|
||||
} else {
|
||||
if self.backlog.len() >= 10 {
|
||||
self.backlog.remove(0);
|
||||
|
||||
@@ -56,7 +56,7 @@ pub enum Details {
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct SystemConnected {
|
||||
pub chain: Box<str>,
|
||||
pub network_id: Option<Box<str>>,
|
||||
#[serde(flatten)]
|
||||
pub node: NodeDetails,
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ pub use primitive_types::H256 as BlockHash;
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct NodeDetails {
|
||||
pub chain: Box<str>,
|
||||
pub name: Box<str>,
|
||||
pub implementation: Box<str>,
|
||||
pub version: Box<str>,
|
||||
|
||||
Reference in New Issue
Block a user