mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-12 15:51:00 +00:00
Pass through node details
This commit is contained in:
@@ -20,13 +20,14 @@ export default class Aggregator extends EventEmitter {
|
||||
this.broadcast(Feed.addedNode(node));
|
||||
|
||||
node.once('disconnect', () => {
|
||||
node.removeAllListeners('block');
|
||||
node.removeAllListeners();
|
||||
|
||||
this.nodes.remove(node);
|
||||
this.broadcast(Feed.removedNode(node));
|
||||
});
|
||||
|
||||
node.on('block', () => this.updateBlock(node));
|
||||
node.on('stats', () => this.broadcast(Feed.stats(node)));
|
||||
}
|
||||
|
||||
public addFeed(feed: Feed) {
|
||||
@@ -40,7 +41,7 @@ export default class Aggregator extends EventEmitter {
|
||||
|
||||
feed.once('disconnect', () => {
|
||||
this.feeds.remove(feed);
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
public nodeList(): IterableIterator<Node> {
|
||||
|
||||
@@ -41,7 +41,7 @@ export default class Feed extends EventEmitter {
|
||||
public static addedNode(node: Node): FeedData {
|
||||
return serialize({
|
||||
action: 'added',
|
||||
payload: [node.id, node.nodeDetails(), node.blockDetails()]
|
||||
payload: [node.id, node.nodeDetails(), node.nodeStats(), node.blockDetails()]
|
||||
})
|
||||
}
|
||||
|
||||
@@ -59,6 +59,13 @@ export default class Feed extends EventEmitter {
|
||||
});
|
||||
}
|
||||
|
||||
public static stats(node: Node): FeedData {
|
||||
return serialize({
|
||||
action: 'stats',
|
||||
payload: [node.id, node.nodeStats()]
|
||||
});
|
||||
}
|
||||
|
||||
public send(data: FeedData) {
|
||||
this.socket.send(data);
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ interface MessageBase {
|
||||
}
|
||||
|
||||
export interface BestBlock {
|
||||
best: string,
|
||||
best: Types.BlockHash,
|
||||
height: Types.BlockNumber,
|
||||
ts: Date,
|
||||
}
|
||||
@@ -44,14 +44,14 @@ interface SystemConnected {
|
||||
name: Types.NodeName,
|
||||
chain: string,
|
||||
config: string,
|
||||
implementation: string,
|
||||
version: string,
|
||||
implementation: Types.NodeImplementation,
|
||||
version: Types.NodeVersion,
|
||||
}
|
||||
|
||||
interface SystemInterval extends BestBlock {
|
||||
export interface SystemInterval extends BestBlock {
|
||||
msg: 'system.interval',
|
||||
txcount: number,
|
||||
peers: number,
|
||||
txcount: Types.TransactionCount,
|
||||
peers: Types.PeerCount,
|
||||
status: 'Idle' | string, // TODO: 'Idle' | ...?
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import * as WebSocket from 'ws';
|
||||
import * as EventEmitter from 'events';
|
||||
import { Maybe, Types, idGenerator } from '@dotstats/common';
|
||||
import { parseMessage, getBestBlock, Message, BestBlock } from './message';
|
||||
import { parseMessage, getBestBlock, Message, BestBlock, SystemInterval } from './message';
|
||||
|
||||
const BLOCK_TIME_HISTORY = 10;
|
||||
const TIMEOUT = 1000 * 60 * 5; // 5 seconds
|
||||
const TIMEOUT = 1000 * 60 * 1; // 1 minute
|
||||
|
||||
const nextId = idGenerator<Types.NodeId>();
|
||||
|
||||
@@ -12,18 +12,27 @@ export default class Node extends EventEmitter {
|
||||
public lastMessage: number;
|
||||
public id: Types.NodeId;
|
||||
public name: Types.NodeName;
|
||||
public implementation: string;
|
||||
public version: string;
|
||||
public implementation: Types.NodeImplementation;
|
||||
public version: Types.NodeVersion;
|
||||
public config: string;
|
||||
public height = 0 as Types.BlockNumber;
|
||||
public latency = 0 as Types.Milliseconds;
|
||||
public blockTime = 0 as Types.Milliseconds;
|
||||
|
||||
private peers = 0 as Types.PeerCount;
|
||||
private txcount = 0 as Types.TransactionCount;
|
||||
|
||||
private socket: WebSocket;
|
||||
private blockTimes: Array<number> = new Array(BLOCK_TIME_HISTORY);
|
||||
private lastBlockAt: Maybe<Date> = null;
|
||||
|
||||
constructor(socket: WebSocket, name: Types.NodeName, config: string, implentation: string, version: string) {
|
||||
constructor(
|
||||
socket: WebSocket,
|
||||
name: Types.NodeName,
|
||||
config: string,
|
||||
implentation: Types.NodeImplementation,
|
||||
version: Types.NodeVersion,
|
||||
) {
|
||||
super();
|
||||
|
||||
this.lastMessage = Date.now();
|
||||
@@ -37,11 +46,11 @@ export default class Node extends EventEmitter {
|
||||
console.log(`Listening to a new node: ${name}`);
|
||||
|
||||
socket.on('message', (data) => {
|
||||
console.log(data);
|
||||
|
||||
const message = parseMessage(data);
|
||||
|
||||
if (!message) return;
|
||||
if (!message) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.lastMessage = Date.now();
|
||||
this.updateLatency(message.ts);
|
||||
@@ -51,6 +60,10 @@ export default class Node extends EventEmitter {
|
||||
if (update) {
|
||||
this.updateBestBlock(update);
|
||||
}
|
||||
|
||||
if (message.msg === 'system.interval') {
|
||||
this.onSystemInterval(message);
|
||||
}
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
@@ -74,8 +87,6 @@ export default class Node extends EventEmitter {
|
||||
}
|
||||
|
||||
function handler(data: WebSocket.Data) {
|
||||
console.log(data);
|
||||
|
||||
const message = parseMessage(data);
|
||||
|
||||
if (message && message.msg === "system.connected") {
|
||||
@@ -106,16 +117,15 @@ export default class Node extends EventEmitter {
|
||||
}
|
||||
|
||||
public nodeDetails(): Types.NodeDetails {
|
||||
return {
|
||||
name: this.name,
|
||||
};
|
||||
return [this.name, this.implementation, this.version];
|
||||
}
|
||||
|
||||
public nodeStats(): Types.NodeStats {
|
||||
return [this.peers, this.txcount];
|
||||
}
|
||||
|
||||
public blockDetails(): Types.BlockDetails {
|
||||
return {
|
||||
height: this.height,
|
||||
blockTime: this.blockTime,
|
||||
};
|
||||
return [this.height, this.blockTime];
|
||||
}
|
||||
|
||||
public get average(): number {
|
||||
@@ -143,6 +153,17 @@ export default class Node extends EventEmitter {
|
||||
this.emit('disconnect');
|
||||
}
|
||||
|
||||
private onSystemInterval(message: SystemInterval) {
|
||||
const { peers, txcount } = message;
|
||||
|
||||
if (this.peers !== peers || this.txcount !== txcount) {
|
||||
this.peers = peers;
|
||||
this.txcount = txcount;
|
||||
|
||||
this.emit('stats');
|
||||
}
|
||||
}
|
||||
|
||||
private updateLatency(time: Date) {
|
||||
this.latency = (this.lastMessage - +time) as Types.Milliseconds;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user