mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-05-30 14:41:07 +00:00
Remove NodeConnector from core for now; only messages from shards until refactor
This commit is contained in:
@@ -6,7 +6,6 @@ use crate::shard::connector::ShardConnector;
|
||||
use crate::chain::{self, Chain, ChainId, Label};
|
||||
use crate::feed::connector::{Connected, FeedConnector, FeedId};
|
||||
use crate::feed::{self, FeedMessageSerializer};
|
||||
use crate::node::connector::NodeConnector;
|
||||
use common::ws::MuteReason;
|
||||
use common::shard::ShardConnId;
|
||||
use common::types::{ConnId, NodeDetails, BlockHash};
|
||||
@@ -131,13 +130,6 @@ pub struct AddNode {
|
||||
}
|
||||
|
||||
pub enum NodeSource {
|
||||
Direct {
|
||||
/// Connection id used by the node connector for multiplexing parachains
|
||||
conn_id: ConnId,
|
||||
/// Address of the NodeConnector actor
|
||||
node_connector: Addr<NodeConnector>,
|
||||
},
|
||||
// TODO
|
||||
Shard {
|
||||
/// `ShardConnId` that identifies the node connection within a shard.
|
||||
sid: ShardConnId,
|
||||
@@ -202,9 +194,6 @@ pub struct GetHealth;
|
||||
impl NodeSource {
|
||||
pub fn mute(&self, reason: MuteReason) {
|
||||
match self {
|
||||
NodeSource::Direct { node_connector, .. } => {
|
||||
node_connector.do_send(reason);
|
||||
},
|
||||
// TODO
|
||||
NodeSource::Shard { shard_connector, .. } => {
|
||||
// shard_connector.do_send(Mute { reason });
|
||||
|
||||
@@ -247,15 +247,6 @@ pub struct LocateNode {
|
||||
impl NodeSource {
|
||||
pub fn init(self, nid: NodeId, chain: Addr<Chain>) -> bool {
|
||||
match self {
|
||||
NodeSource::Direct { conn_id, node_connector } => {
|
||||
node_connector
|
||||
.try_send(crate::node::connector::Initialize {
|
||||
nid,
|
||||
conn_id,
|
||||
chain,
|
||||
})
|
||||
.is_ok()
|
||||
},
|
||||
NodeSource::Shard { sid, shard_connector } => {
|
||||
shard_connector
|
||||
.try_send(crate::shard::connector::Initialize {
|
||||
|
||||
@@ -78,6 +78,8 @@ impl FeedConnector {
|
||||
fn handle_cmd(&mut self, cmd: &str, payload: &str, ctx: &mut <Self as Actor>::Context) {
|
||||
match cmd {
|
||||
"subscribe" => {
|
||||
// Hash the chain label the frontend wants to subscribe to.
|
||||
// If it's already subscribed to the same chain, nothing to do.
|
||||
match fnv(payload) {
|
||||
hash if hash == self.chain_label_hash => return,
|
||||
hash => self.chain_label_hash = hash,
|
||||
|
||||
@@ -104,8 +104,10 @@ async fn feed_route(
|
||||
stream: web::Payload,
|
||||
aggregator: web::Data<Addr<Aggregator>>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let aggregator = aggregator.get_ref().clone();
|
||||
|
||||
ws::start(
|
||||
FeedConnector::new(aggregator.get_ref().clone()),
|
||||
FeedConnector::new(aggregator),
|
||||
&req,
|
||||
stream,
|
||||
)
|
||||
|
||||
@@ -7,8 +7,6 @@ use common::types::{
|
||||
use common::util::now;
|
||||
use common::node::SystemInterval;
|
||||
|
||||
pub mod connector;
|
||||
|
||||
/// Minimum time between block below broadcasting updates to the browser gets throttled, in ms.
|
||||
const THROTTLE_THRESHOLD: u64 = 100;
|
||||
/// Minimum time of intervals for block updates sent to the browser when throttled, in ms.
|
||||
|
||||
@@ -1,229 +0,0 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::aggregator::{AddNode, Aggregator, NodeSource};
|
||||
use crate::chain::{Chain, RemoveNode, UpdateNode};
|
||||
use crate::location::LocateRequest;
|
||||
use crate::node::NodeId;
|
||||
use actix::prelude::*;
|
||||
use actix_web_actors::ws::{self, CloseReason};
|
||||
use bytes::Bytes;
|
||||
use common::types::ConnId;
|
||||
use common::ws::{MultipartHandler, WsMessage, MuteReason};
|
||||
use common::node::{NodeMessage, Payload};
|
||||
|
||||
/// How often heartbeat pings are sent
|
||||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
|
||||
/// How long before lack of client response causes a timeout
|
||||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
pub struct NodeConnector {
|
||||
/// Multiplexing connections by id
|
||||
multiplex: BTreeMap<ConnId, ConnMultiplex>,
|
||||
/// Client must send ping at least once every 60 seconds (CLIENT_TIMEOUT),
|
||||
hb: Instant,
|
||||
/// Aggregator actor address
|
||||
aggregator: Addr<Aggregator>,
|
||||
/// IP address of the node this connector is responsible for
|
||||
ip: Option<Ipv4Addr>,
|
||||
/// Actix address of location services
|
||||
locator: Recipient<LocateRequest>,
|
||||
/// Helper for handling continuation messages
|
||||
multipart: MultipartHandler,
|
||||
}
|
||||
|
||||
enum ConnMultiplex {
|
||||
Connected {
|
||||
/// Id of the node this multiplex connector is responsible for handling
|
||||
nid: NodeId,
|
||||
/// Chain address to which this multiplex connector is delegating messages
|
||||
chain: Addr<Chain>,
|
||||
},
|
||||
Waiting {
|
||||
/// Backlog of messages to be sent once we get a recipient handle to the chain
|
||||
backlog: Vec<Payload>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Default for ConnMultiplex {
|
||||
fn default() -> Self {
|
||||
ConnMultiplex::Waiting {
|
||||
backlog: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for NodeConnector {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
self.heartbeat(ctx);
|
||||
}
|
||||
|
||||
fn stopped(&mut self, _: &mut Self::Context) {
|
||||
for mx in self.multiplex.values() {
|
||||
if let ConnMultiplex::Connected { chain, nid } = mx {
|
||||
chain.do_send(RemoveNode(*nid));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeConnector {
|
||||
pub fn new(
|
||||
aggregator: Addr<Aggregator>,
|
||||
locator: Recipient<LocateRequest>,
|
||||
ip: Option<Ipv4Addr>,
|
||||
) -> Self {
|
||||
Self {
|
||||
multiplex: BTreeMap::new(),
|
||||
hb: Instant::now(),
|
||||
aggregator,
|
||||
ip,
|
||||
locator,
|
||||
multipart: MultipartHandler::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn heartbeat(&self, ctx: &mut <Self as Actor>::Context) {
|
||||
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
|
||||
// check client heartbeats
|
||||
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
|
||||
// stop actor
|
||||
ctx.close(Some(CloseReason {
|
||||
code: ws::CloseCode::Abnormal,
|
||||
description: Some("Missed heartbeat".into()),
|
||||
}));
|
||||
ctx.stop();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
&mut self,
|
||||
msg: NodeMessage,
|
||||
ctx: &mut <Self as Actor>::Context,
|
||||
) {
|
||||
let conn_id = msg.id();
|
||||
let payload = msg.into();
|
||||
|
||||
match self.multiplex.entry(conn_id).or_default() {
|
||||
ConnMultiplex::Connected { nid, chain } => {
|
||||
chain.do_send(UpdateNode {
|
||||
nid: *nid,
|
||||
payload,
|
||||
});
|
||||
}
|
||||
ConnMultiplex::Waiting { backlog } => {
|
||||
if let Payload::SystemConnected(connected) = payload {
|
||||
self.aggregator.do_send(AddNode {
|
||||
node: connected.node,
|
||||
genesis_hash: connected.genesis_hash,
|
||||
source: NodeSource::Direct {
|
||||
conn_id,
|
||||
node_connector: ctx.address(),
|
||||
},
|
||||
});
|
||||
} else {
|
||||
if backlog.len() >= 10 {
|
||||
backlog.remove(0);
|
||||
}
|
||||
|
||||
backlog.push(payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<MuteReason> for NodeConnector {
|
||||
type Result = ();
|
||||
fn handle(&mut self, msg: MuteReason, ctx: &mut Self::Context) {
|
||||
log::debug!(target: "NodeConnector::Mute", "Muting a node. Reason: {:?}", msg);
|
||||
|
||||
ctx.close(Some(msg.into()));
|
||||
ctx.stop();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct Initialize {
|
||||
pub nid: NodeId,
|
||||
pub conn_id: ConnId,
|
||||
pub chain: Addr<Chain>,
|
||||
}
|
||||
|
||||
impl Handler<Initialize> for NodeConnector {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: Initialize, _: &mut Self::Context) {
|
||||
let Initialize {
|
||||
nid,
|
||||
conn_id,
|
||||
chain,
|
||||
} = msg;
|
||||
log::trace!(target: "NodeConnector::Initialize", "Initializing a node, nid={}, on conn_id={}", nid, conn_id);
|
||||
let mx = self.multiplex.entry(conn_id).or_default();
|
||||
|
||||
if let ConnMultiplex::Waiting { backlog } = mx {
|
||||
for payload in backlog.drain(..) {
|
||||
chain.do_send(UpdateNode {
|
||||
nid,
|
||||
payload,
|
||||
});
|
||||
}
|
||||
|
||||
*mx = ConnMultiplex::Connected {
|
||||
nid,
|
||||
chain: chain.clone(),
|
||||
};
|
||||
};
|
||||
|
||||
// Acquire the node's physical location
|
||||
if let Some(ip) = self.ip {
|
||||
let _ = self.locator.do_send(LocateRequest { ip, nid, chain });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for NodeConnector {
|
||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
self.hb = Instant::now();
|
||||
|
||||
let data = match msg.map(|msg| self.multipart.handle(msg)) {
|
||||
Ok(WsMessage::Nop) => return,
|
||||
Ok(WsMessage::Ping(msg)) => {
|
||||
ctx.pong(&msg);
|
||||
return;
|
||||
}
|
||||
Ok(WsMessage::Data(data)) => data,
|
||||
Ok(WsMessage::Close(reason)) => {
|
||||
ctx.close(reason);
|
||||
ctx.stop();
|
||||
return;
|
||||
}
|
||||
Err(error) => {
|
||||
log::error!("{:?}", error);
|
||||
ctx.stop();
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match serde_json::from_slice(&data) {
|
||||
Ok(msg) => self.handle_message(msg, ctx),
|
||||
#[cfg(debug)]
|
||||
Err(err) => {
|
||||
let data: &[u8] = data.get(..512).unwrap_or_else(|| &data);
|
||||
log::warn!(
|
||||
"Failed to parse node message: {} {}",
|
||||
err,
|
||||
std::str::from_utf8(data).unwrap_or_else(|_| "INVALID UTF8")
|
||||
)
|
||||
}
|
||||
#[cfg(not(debug))]
|
||||
Err(_) => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
pub mod connector;
|
||||
@@ -0,0 +1 @@
|
||||
pub mod connector;
|
||||
@@ -100,8 +100,6 @@ impl Chain {
|
||||
payload,
|
||||
};
|
||||
|
||||
println!("Serialize {:?}", msg);
|
||||
|
||||
let bytes = bincode::options().serialize(&msg).unwrap();
|
||||
|
||||
println!("Sending update: {} bytes", bytes.len());
|
||||
|
||||
Reference in New Issue
Block a user