mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-15 15:21:03 +00:00
Reformatting
This commit is contained in:
@@ -5,80 +5,80 @@ import FeedSet from './FeedSet';
|
||||
import { Types, FeedMessage } from '@dotstats/common';
|
||||
|
||||
export default class Aggregator {
|
||||
private readonly chains = new Map<Types.ChainLabel, Chain>();
|
||||
private readonly feeds = new FeedSet();
|
||||
private readonly chains = new Map<Types.ChainLabel, Chain>();
|
||||
private readonly feeds = new FeedSet();
|
||||
|
||||
constructor() {
|
||||
setInterval(() => this.timeoutCheck(), 10000);
|
||||
constructor() {
|
||||
setInterval(() => this.timeoutCheck(), 10000);
|
||||
}
|
||||
|
||||
public addNode(node: Node) {
|
||||
let chain = this.getChain(node.chain);
|
||||
|
||||
chain.addNode(node);
|
||||
}
|
||||
|
||||
public addFeed(feed: Feed) {
|
||||
this.feeds.add(feed);
|
||||
|
||||
for (const chain of this.chains.values()) {
|
||||
feed.sendMessage(Feed.addedChain(chain.label));
|
||||
}
|
||||
|
||||
public addNode(node: Node) {
|
||||
let chain = this.getChain(node.chain);
|
||||
feed.events.on('subscribe', (label: Types.ChainLabel) => {
|
||||
const chain = this.chains.get(label);
|
||||
|
||||
chain.addNode(node);
|
||||
}
|
||||
if (chain) {
|
||||
chain.addFeed(feed);
|
||||
feed.sendMessage(Feed.subscribedTo(label));
|
||||
}
|
||||
});
|
||||
|
||||
public addFeed(feed: Feed) {
|
||||
this.feeds.add(feed);
|
||||
feed.events.on('unsubscribe', (label: Types.ChainLabel) => {
|
||||
const chain = this.chains.get(label);
|
||||
|
||||
for (const chain of this.chains.values()) {
|
||||
feed.sendMessage(Feed.addedChain(chain.label));
|
||||
if (chain) {
|
||||
chain.removeFeed(feed);
|
||||
feed.sendMessage(Feed.unsubscribedFrom(label));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private getChain(label: Types.ChainLabel): Chain {
|
||||
const chain = this.chains.get(label);
|
||||
|
||||
if (chain) {
|
||||
return chain;
|
||||
} else {
|
||||
const chain = new Chain(label);
|
||||
|
||||
chain.events.on('disconnect', (count: number) => {
|
||||
if (count !== 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
feed.events.on('subscribe', (label: Types.ChainLabel) => {
|
||||
const chain = this.chains.get(label);
|
||||
chain.events.removeAllListeners();
|
||||
|
||||
if (chain) {
|
||||
chain.addFeed(feed);
|
||||
feed.sendMessage(Feed.subscribedTo(label));
|
||||
}
|
||||
})
|
||||
this.chains.delete(chain.label);
|
||||
|
||||
feed.events.on('unsubscribe', (label: Types.ChainLabel) => {
|
||||
const chain = this.chains.get(label);
|
||||
console.log(`Chain: ${label} lost all nodes`);
|
||||
this.feeds.broadcast(Feed.removedChain(label));
|
||||
});
|
||||
|
||||
if (chain) {
|
||||
chain.removeFeed(feed);
|
||||
feed.sendMessage(Feed.unsubscribedFrom(label));
|
||||
}
|
||||
});
|
||||
this.chains.set(label, chain);
|
||||
|
||||
console.log(`New chain: ${label}`);
|
||||
this.feeds.broadcast(Feed.addedChain(label));
|
||||
|
||||
return chain;
|
||||
}
|
||||
}
|
||||
|
||||
private getChain(label: Types.ChainLabel): Chain {
|
||||
const chain = this.chains.get(label);
|
||||
private timeoutCheck() {
|
||||
const empty: Types.ChainLabel[] = [];
|
||||
|
||||
if (chain) {
|
||||
return chain;
|
||||
} else {
|
||||
const chain = new Chain(label);
|
||||
|
||||
chain.events.on('disconnect', (count: number) => {
|
||||
if (count !== 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
chain.events.removeAllListeners();
|
||||
|
||||
this.chains.delete(chain.label);
|
||||
|
||||
console.log(`Chain: ${label} lost all nodes`);
|
||||
this.feeds.broadcast(Feed.removedChain(label));
|
||||
});
|
||||
|
||||
this.chains.set(label, chain);
|
||||
|
||||
console.log(`New chain: ${label}`);
|
||||
this.feeds.broadcast(Feed.addedChain(label));
|
||||
|
||||
return chain;
|
||||
}
|
||||
}
|
||||
|
||||
private timeoutCheck() {
|
||||
const empty: Types.ChainLabel[] = [];
|
||||
|
||||
for (const chain of this.chains.values()) {
|
||||
chain.timeoutCheck();
|
||||
}
|
||||
for (const chain of this.chains.values()) {
|
||||
chain.timeoutCheck();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,86 +5,86 @@ import FeedSet from './FeedSet';
|
||||
import { timestamp, Types, FeedMessage } from '@dotstats/common';
|
||||
|
||||
export default class Chain {
|
||||
private nodes = new Set<Node>();
|
||||
private feeds = new FeedSet();
|
||||
private nodes = new Set<Node>();
|
||||
private feeds = new FeedSet();
|
||||
|
||||
public readonly events = new EventEmitter();
|
||||
public readonly label: Types.ChainLabel;
|
||||
public readonly events = new EventEmitter();
|
||||
public readonly label: Types.ChainLabel;
|
||||
|
||||
public height = 0 as Types.BlockNumber;
|
||||
public blockTimestamp = 0 as Types.Timestamp;
|
||||
public height = 0 as Types.BlockNumber;
|
||||
public blockTimestamp = 0 as Types.Timestamp;
|
||||
|
||||
constructor(label: Types.ChainLabel) {
|
||||
this.label = label;
|
||||
constructor(label: Types.ChainLabel) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
public get nodeCount(): number {
|
||||
return this.nodes.size;
|
||||
}
|
||||
|
||||
public addNode(node: Node) {
|
||||
console.log(`[${this.label}] new node: ${node.name}`);
|
||||
|
||||
this.nodes.add(node);
|
||||
this.feeds.broadcast(Feed.addedNode(node));
|
||||
|
||||
node.events.once('disconnect', () => {
|
||||
node.events.removeAllListeners();
|
||||
|
||||
this.nodes.delete(node);
|
||||
this.feeds.broadcast(Feed.removedNode(node));
|
||||
|
||||
this.events.emit('disconnect', this.nodeCount);
|
||||
});
|
||||
|
||||
node.events.on('block', () => this.updateBlock(node));
|
||||
node.events.on('stats', () => this.feeds.broadcast(Feed.stats(node)));
|
||||
}
|
||||
|
||||
public addFeed(feed: Feed) {
|
||||
this.feeds.add(feed);
|
||||
|
||||
// TODO: this is a bit unclean, find a better way
|
||||
feed.chain = this.label;
|
||||
|
||||
feed.sendMessage(Feed.timeSync());
|
||||
feed.sendMessage(Feed.bestBlock(this.height, this.blockTimestamp));
|
||||
|
||||
for (const node of this.nodes.values()) {
|
||||
feed.sendMessage(Feed.addedNode(node));
|
||||
}
|
||||
}
|
||||
|
||||
public removeFeed(feed: Feed) {
|
||||
this.feeds.remove(feed);
|
||||
}
|
||||
|
||||
public nodeList(): IterableIterator<Node> {
|
||||
return this.nodes.values();
|
||||
}
|
||||
|
||||
public timeoutCheck() {
|
||||
const now = timestamp();
|
||||
|
||||
for (const node of this.nodes.values()) {
|
||||
node.timeoutCheck(now);
|
||||
}
|
||||
|
||||
public get nodeCount(): number {
|
||||
return this.nodes.size;
|
||||
this.feeds.broadcast(Feed.timeSync());
|
||||
}
|
||||
|
||||
private updateBlock(node: Node) {
|
||||
if (node.height > this.height) {
|
||||
this.height = node.height;
|
||||
this.blockTimestamp = node.blockTimestamp;
|
||||
|
||||
this.feeds.broadcast(Feed.bestBlock(this.height, this.blockTimestamp));
|
||||
|
||||
console.log(`[${this.label}] New block ${this.height}`);
|
||||
}
|
||||
|
||||
public addNode(node: Node) {
|
||||
console.log(`[${this.label}] new node: ${node.name}`);
|
||||
this.feeds.broadcast(Feed.imported(node));
|
||||
|
||||
this.nodes.add(node);
|
||||
this.feeds.broadcast(Feed.addedNode(node));
|
||||
|
||||
node.events.once('disconnect', () => {
|
||||
node.events.removeAllListeners();
|
||||
|
||||
this.nodes.delete(node);
|
||||
this.feeds.broadcast(Feed.removedNode(node));
|
||||
|
||||
this.events.emit('disconnect', this.nodeCount);
|
||||
});
|
||||
|
||||
node.events.on('block', () => this.updateBlock(node));
|
||||
node.events.on('stats', () => this.feeds.broadcast(Feed.stats(node)));
|
||||
}
|
||||
|
||||
public addFeed(feed: Feed) {
|
||||
this.feeds.add(feed);
|
||||
|
||||
// TODO: this is a bit unclean, find a better way
|
||||
feed.chain = this.label;
|
||||
|
||||
feed.sendMessage(Feed.timeSync());
|
||||
feed.sendMessage(Feed.bestBlock(this.height, this.blockTimestamp));
|
||||
|
||||
for (const node of this.nodes.values()) {
|
||||
feed.sendMessage(Feed.addedNode(node));
|
||||
}
|
||||
}
|
||||
|
||||
public removeFeed(feed: Feed) {
|
||||
this.feeds.remove(feed);
|
||||
}
|
||||
|
||||
public nodeList(): IterableIterator<Node> {
|
||||
return this.nodes.values();
|
||||
}
|
||||
|
||||
public timeoutCheck() {
|
||||
const now = timestamp();
|
||||
|
||||
for (const node of this.nodes.values()) {
|
||||
node.timeoutCheck(now);
|
||||
}
|
||||
|
||||
this.feeds.broadcast(Feed.timeSync());
|
||||
}
|
||||
|
||||
private updateBlock(node: Node) {
|
||||
if (node.height > this.height) {
|
||||
this.height = node.height;
|
||||
this.blockTimestamp = node.blockTimestamp;
|
||||
|
||||
this.feeds.broadcast(Feed.bestBlock(this.height, this.blockTimestamp));
|
||||
|
||||
console.log(`[${this.label}] New block ${this.height}`);
|
||||
}
|
||||
|
||||
this.feeds.broadcast(Feed.imported(node));
|
||||
|
||||
console.log(`[${this.label}] ${node.name} imported ${node.height}, block time: ${node.blockTime / 1000}s, average: ${node.average / 1000}s | latency ${node.latency}`);
|
||||
}
|
||||
console.log(`[${this.label}] ${node.name} imported ${node.height}, block time: ${node.blockTime / 1000}s, average: ${node.average / 1000}s | latency ${node.latency}`);
|
||||
}
|
||||
}
|
||||
|
||||
+111
-111
@@ -7,130 +7,130 @@ const nextId = idGenerator<Types.FeedId>();
|
||||
const { Actions } = FeedMessage;
|
||||
|
||||
export default class Feed {
|
||||
public id: Types.FeedId;
|
||||
public id: Types.FeedId;
|
||||
|
||||
public chain: Maybe<Types.ChainLabel> = null;
|
||||
public readonly events = new EventEmitter();
|
||||
public chain: Maybe<Types.ChainLabel> = null;
|
||||
public readonly events = new EventEmitter();
|
||||
|
||||
private socket: WebSocket;
|
||||
private messages: Array<FeedMessage.Message> = [];
|
||||
private socket: WebSocket;
|
||||
private messages: Array<FeedMessage.Message> = [];
|
||||
|
||||
constructor(socket: WebSocket) {
|
||||
this.id = nextId();
|
||||
this.socket = socket;
|
||||
constructor(socket: WebSocket) {
|
||||
this.id = nextId();
|
||||
this.socket = socket;
|
||||
|
||||
socket.on('message', (data) => this.handleCommand(data.toString()));
|
||||
socket.on('error', () => this.disconnect());
|
||||
socket.on('close', () => this.disconnect());
|
||||
socket.on('message', (data) => this.handleCommand(data.toString()));
|
||||
socket.on('error', () => this.disconnect());
|
||||
socket.on('close', () => this.disconnect());
|
||||
}
|
||||
|
||||
public static bestBlock(height: Types.BlockNumber, ts: Types.Timestamp): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.BestBlock,
|
||||
payload: [height, ts]
|
||||
};
|
||||
}
|
||||
|
||||
public static addedNode(node: Node): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.AddedNode,
|
||||
payload: [node.id, node.nodeDetails(), node.nodeStats(), node.blockDetails()]
|
||||
};
|
||||
}
|
||||
|
||||
public static removedNode(node: Node): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.RemovedNode,
|
||||
payload: node.id
|
||||
};
|
||||
}
|
||||
|
||||
public static imported(node: Node): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.ImportedBlock,
|
||||
payload: [node.id, node.blockDetails()]
|
||||
};
|
||||
}
|
||||
|
||||
public static stats(node: Node): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.NodeStats,
|
||||
payload: [node.id, node.nodeStats()]
|
||||
};
|
||||
}
|
||||
|
||||
public static timeSync(): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.TimeSync,
|
||||
payload: timestamp()
|
||||
};
|
||||
}
|
||||
|
||||
public static addedChain(label: Types.ChainLabel): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.AddedChain,
|
||||
payload: label
|
||||
};
|
||||
}
|
||||
|
||||
public static removedChain(label: Types.ChainLabel): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.RemovedChain,
|
||||
payload: label
|
||||
}
|
||||
}
|
||||
|
||||
public static bestBlock(height: Types.BlockNumber, ts: Types.Timestamp): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.BestBlock,
|
||||
payload: [height, ts]
|
||||
};
|
||||
public static subscribedTo(label: Types.ChainLabel): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.SubscribedTo,
|
||||
payload: label,
|
||||
}
|
||||
}
|
||||
|
||||
public static addedNode(node: Node): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.AddedNode,
|
||||
payload: [node.id, node.nodeDetails(), node.nodeStats(), node.blockDetails()]
|
||||
};
|
||||
public static unsubscribedFrom(label: Types.ChainLabel): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.UnsubscribedFrom,
|
||||
payload: label,
|
||||
}
|
||||
}
|
||||
|
||||
public static removedNode(node: Node): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.RemovedNode,
|
||||
payload: node.id
|
||||
};
|
||||
public sendData(data: FeedMessage.Data) {
|
||||
this.socket.send(data);
|
||||
}
|
||||
|
||||
public sendMessage(message: FeedMessage.Message) {
|
||||
const queue = this.messages.length === 0;
|
||||
|
||||
this.messages.push(message);
|
||||
|
||||
if (queue) {
|
||||
process.nextTick(this.sendMessages);
|
||||
}
|
||||
}
|
||||
|
||||
public static imported(node: Node): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.ImportedBlock,
|
||||
payload: [node.id, node.blockDetails()]
|
||||
};
|
||||
private sendMessages = () => {
|
||||
const data = FeedMessage.serialize(this.messages);
|
||||
this.messages = [];
|
||||
this.socket.send(data);
|
||||
}
|
||||
|
||||
private handleCommand(cmd: string) {
|
||||
if (cmd.startsWith('subscribe:')) {
|
||||
if (this.chain) {
|
||||
this.events.emit('unsubscribe', this.chain);
|
||||
this.chain = null;
|
||||
}
|
||||
|
||||
const label = cmd.substr(10) as Types.ChainLabel;
|
||||
|
||||
this.events.emit('subscribe', label);
|
||||
}
|
||||
}
|
||||
|
||||
public static stats(node: Node): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.NodeStats,
|
||||
payload: [node.id, node.nodeStats()]
|
||||
};
|
||||
}
|
||||
private disconnect() {
|
||||
this.socket.removeAllListeners();
|
||||
this.socket.close();
|
||||
|
||||
public static timeSync(): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.TimeSync,
|
||||
payload: timestamp()
|
||||
};
|
||||
}
|
||||
|
||||
public static addedChain(label: Types.ChainLabel): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.AddedChain,
|
||||
payload: label
|
||||
};
|
||||
}
|
||||
|
||||
public static removedChain(label: Types.ChainLabel): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.RemovedChain,
|
||||
payload: label
|
||||
}
|
||||
}
|
||||
|
||||
public static subscribedTo(label: Types.ChainLabel): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.SubscribedTo,
|
||||
payload: label,
|
||||
}
|
||||
}
|
||||
|
||||
public static unsubscribedFrom(label: Types.ChainLabel): FeedMessage.Message {
|
||||
return {
|
||||
action: Actions.UnsubscribedFrom,
|
||||
payload: label,
|
||||
}
|
||||
}
|
||||
|
||||
public sendData(data: FeedMessage.Data) {
|
||||
this.socket.send(data);
|
||||
}
|
||||
|
||||
public sendMessage(message: FeedMessage.Message) {
|
||||
const queue = this.messages.length === 0;
|
||||
|
||||
this.messages.push(message);
|
||||
|
||||
if (queue) {
|
||||
process.nextTick(this.sendMessages);
|
||||
}
|
||||
}
|
||||
|
||||
private sendMessages = () => {
|
||||
const data = FeedMessage.serialize(this.messages);
|
||||
this.messages = [];
|
||||
this.socket.send(data);
|
||||
}
|
||||
|
||||
private handleCommand(cmd: string) {
|
||||
if (cmd.startsWith('subscribe:')) {
|
||||
if (this.chain) {
|
||||
this.events.emit('unsubscribe', this.chain);
|
||||
this.chain = null;
|
||||
}
|
||||
|
||||
const label = cmd.substr(10) as Types.ChainLabel;
|
||||
|
||||
this.events.emit('subscribe', label);
|
||||
}
|
||||
}
|
||||
|
||||
private disconnect() {
|
||||
this.socket.removeAllListeners();
|
||||
this.socket.close();
|
||||
|
||||
this.events.emit('disconnect');
|
||||
}
|
||||
this.events.emit('disconnect');
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,52 +4,52 @@ import { FeedMessage } from '@dotstats/common';
|
||||
type DisconnectListener = () => void;
|
||||
|
||||
export default class FeedSet {
|
||||
private feeds = new Map<Feed, DisconnectListener>();
|
||||
private messages: Array<FeedMessage.Message> = [];
|
||||
private feeds = new Map<Feed, DisconnectListener>();
|
||||
private messages: Array<FeedMessage.Message> = [];
|
||||
|
||||
public values(): IterableIterator<Feed> {
|
||||
return this.feeds.keys();
|
||||
public values(): IterableIterator<Feed> {
|
||||
return this.feeds.keys();
|
||||
}
|
||||
|
||||
public each(fn: (feed: Feed) => void) {
|
||||
for (const feed of this.values()) {
|
||||
fn(feed);
|
||||
}
|
||||
}
|
||||
|
||||
public add(feed: Feed) {
|
||||
const listener = () => this.remove(feed);
|
||||
|
||||
this.feeds.set(feed, listener);
|
||||
|
||||
feed.events.once('disconnect', listener);
|
||||
}
|
||||
|
||||
public remove(feed: Feed) {
|
||||
const listener = this.feeds.get(feed);
|
||||
|
||||
if (!listener) {
|
||||
return;
|
||||
}
|
||||
|
||||
public each(fn: (feed: Feed) => void) {
|
||||
for (const feed of this.values()) {
|
||||
fn(feed);
|
||||
}
|
||||
feed.events.removeListener('disconnect', listener);
|
||||
|
||||
this.feeds.delete(feed);
|
||||
}
|
||||
|
||||
public broadcast(message: FeedMessage.Message) {
|
||||
const queue = this.messages.length === 0;
|
||||
|
||||
this.messages.push(message);
|
||||
|
||||
if (queue) {
|
||||
process.nextTick(this.sendMessages);
|
||||
}
|
||||
}
|
||||
|
||||
public add(feed: Feed) {
|
||||
const listener = () => this.remove(feed);
|
||||
|
||||
this.feeds.set(feed, listener);
|
||||
|
||||
feed.events.once('disconnect', listener);
|
||||
}
|
||||
|
||||
public remove(feed: Feed) {
|
||||
const listener = this.feeds.get(feed);
|
||||
|
||||
if (!listener) {
|
||||
return;
|
||||
}
|
||||
|
||||
feed.events.removeListener('disconnect', listener);
|
||||
|
||||
this.feeds.delete(feed);
|
||||
}
|
||||
|
||||
public broadcast(message: FeedMessage.Message) {
|
||||
const queue = this.messages.length === 0;
|
||||
|
||||
this.messages.push(message);
|
||||
|
||||
if (queue) {
|
||||
process.nextTick(this.sendMessages);
|
||||
}
|
||||
}
|
||||
|
||||
private sendMessages = () => {
|
||||
const data = FeedMessage.serialize(this.messages);
|
||||
this.messages = [];
|
||||
this.each(feed => feed.sendData(data));
|
||||
}
|
||||
private sendMessages = () => {
|
||||
const data = FeedMessage.serialize(this.messages);
|
||||
this.messages = [];
|
||||
this.each(feed => feed.sendData(data));
|
||||
}
|
||||
}
|
||||
|
||||
+166
-166
@@ -9,199 +9,199 @@ const TIMEOUT = (1000 * 60 * 1) as Types.Milliseconds; // 1 minute
|
||||
const nextId = idGenerator<Types.NodeId>();
|
||||
|
||||
export default class Node {
|
||||
public readonly id: Types.NodeId;
|
||||
public readonly name: Types.NodeName;
|
||||
public readonly chain: Types.ChainLabel;
|
||||
public readonly implementation: Types.NodeImplementation;
|
||||
public readonly version: Types.NodeVersion;
|
||||
public readonly id: Types.NodeId;
|
||||
public readonly name: Types.NodeName;
|
||||
public readonly chain: Types.ChainLabel;
|
||||
public readonly implementation: Types.NodeImplementation;
|
||||
public readonly version: Types.NodeVersion;
|
||||
|
||||
public readonly events = new EventEmitter();
|
||||
public readonly events = new EventEmitter();
|
||||
|
||||
public lastMessage: Types.Timestamp;
|
||||
public config: string;
|
||||
public best = '' as Types.BlockHash;
|
||||
public height = 0 as Types.BlockNumber;
|
||||
public latency = 0 as Types.Milliseconds;
|
||||
public blockTime = 0 as Types.Milliseconds;
|
||||
public blockTimestamp = 0 as Types.Timestamp;
|
||||
public lastMessage: Types.Timestamp;
|
||||
public config: string;
|
||||
public best = '' as Types.BlockHash;
|
||||
public height = 0 as Types.BlockNumber;
|
||||
public latency = 0 as Types.Milliseconds;
|
||||
public blockTime = 0 as Types.Milliseconds;
|
||||
public blockTimestamp = 0 as Types.Timestamp;
|
||||
|
||||
private peers = 0 as Types.PeerCount;
|
||||
private txcount = 0 as Types.TransactionCount;
|
||||
private peers = 0 as Types.PeerCount;
|
||||
private txcount = 0 as Types.TransactionCount;
|
||||
|
||||
private readonly socket: WebSocket;
|
||||
private blockTimes: Array<number> = new Array(BLOCK_TIME_HISTORY);
|
||||
private lastBlockAt: Maybe<Date> = null;
|
||||
private readonly socket: WebSocket;
|
||||
private blockTimes: Array<number> = new Array(BLOCK_TIME_HISTORY);
|
||||
private lastBlockAt: Maybe<Date> = null;
|
||||
|
||||
constructor(
|
||||
socket: WebSocket,
|
||||
name: Types.NodeName,
|
||||
chain: Types.ChainLabel,
|
||||
config: string,
|
||||
implentation: Types.NodeImplementation,
|
||||
version: Types.NodeVersion,
|
||||
) {
|
||||
this.id = nextId();
|
||||
this.name = name;
|
||||
this.chain = chain;
|
||||
this.config = config;
|
||||
this.implementation = implentation;
|
||||
this.version = version;
|
||||
this.lastMessage = timestamp();
|
||||
this.socket = socket;
|
||||
constructor(
|
||||
socket: WebSocket,
|
||||
name: Types.NodeName,
|
||||
chain: Types.ChainLabel,
|
||||
config: string,
|
||||
implentation: Types.NodeImplementation,
|
||||
version: Types.NodeVersion,
|
||||
) {
|
||||
this.id = nextId();
|
||||
this.name = name;
|
||||
this.chain = chain;
|
||||
this.config = config;
|
||||
this.implementation = implentation;
|
||||
this.version = version;
|
||||
this.lastMessage = timestamp();
|
||||
this.socket = socket;
|
||||
|
||||
socket.on('message', (data) => {
|
||||
const message = parseMessage(data);
|
||||
socket.on('message', (data) => {
|
||||
const message = parseMessage(data);
|
||||
|
||||
if (!message) {
|
||||
return;
|
||||
}
|
||||
if (!message) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.lastMessage = timestamp();
|
||||
this.updateLatency(message.ts);
|
||||
this.lastMessage = timestamp();
|
||||
this.updateLatency(message.ts);
|
||||
|
||||
const update = getBestBlock(message);
|
||||
const update = getBestBlock(message);
|
||||
|
||||
if (update) {
|
||||
this.updateBestBlock(update);
|
||||
}
|
||||
if (update) {
|
||||
this.updateBestBlock(update);
|
||||
}
|
||||
|
||||
if (message.msg === 'system.interval') {
|
||||
this.onSystemInterval(message);
|
||||
}
|
||||
});
|
||||
if (message.msg === 'system.interval') {
|
||||
this.onSystemInterval(message);
|
||||
}
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
console.log(`${this.name} has disconnected`);
|
||||
socket.on('close', () => {
|
||||
console.log(`${this.name} has disconnected`);
|
||||
|
||||
this.disconnect();
|
||||
});
|
||||
this.disconnect();
|
||||
});
|
||||
|
||||
socket.on('error', (error) => {
|
||||
console.error(`${this.name} has errored`, error);
|
||||
socket.on('error', (error) => {
|
||||
console.error(`${this.name} has errored`, error);
|
||||
|
||||
this.disconnect();
|
||||
});
|
||||
}
|
||||
this.disconnect();
|
||||
});
|
||||
}
|
||||
|
||||
public static fromSocket(socket: WebSocket): Promise<Node> {
|
||||
return new Promise((resolve, reject) => {
|
||||
function cleanup() {
|
||||
clearTimeout(timeout);
|
||||
socket.removeAllListeners('message');
|
||||
}
|
||||
public static fromSocket(socket: WebSocket): Promise<Node> {
|
||||
return new Promise((resolve, reject) => {
|
||||
function cleanup() {
|
||||
clearTimeout(timeout);
|
||||
socket.removeAllListeners('message');
|
||||
}
|
||||
|
||||
function handler(data: WebSocket.Data) {
|
||||
const message = parseMessage(data);
|
||||
function handler(data: WebSocket.Data) {
|
||||
const message = parseMessage(data);
|
||||
|
||||
if (message && message.msg === "system.connected") {
|
||||
cleanup();
|
||||
if (message && message.msg === "system.connected") {
|
||||
cleanup();
|
||||
|
||||
const { name, chain, config, implementation, version } = message;
|
||||
const { name, chain, config, implementation, version } = message;
|
||||
|
||||
resolve(new Node(socket, name, chain, config, implementation, version));
|
||||
}
|
||||
}
|
||||
|
||||
socket.on('message', handler);
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
cleanup();
|
||||
|
||||
socket.close();
|
||||
|
||||
return reject(new Error('Timeout on waiting for system.connected message'));
|
||||
}, 5000);
|
||||
});
|
||||
}
|
||||
|
||||
public timeoutCheck(now: Types.Timestamp) {
|
||||
if (this.lastMessage + TIMEOUT < now) {
|
||||
this.disconnect();
|
||||
resolve(new Node(socket, name, chain, config, implementation, version));
|
||||
}
|
||||
}
|
||||
|
||||
socket.on('message', handler);
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
cleanup();
|
||||
|
||||
socket.close();
|
||||
|
||||
return reject(new Error('Timeout on waiting for system.connected message'));
|
||||
}, 5000);
|
||||
});
|
||||
}
|
||||
|
||||
public timeoutCheck(now: Types.Timestamp) {
|
||||
if (this.lastMessage + TIMEOUT < now) {
|
||||
this.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
public nodeDetails(): Types.NodeDetails {
|
||||
return [this.name, this.implementation, this.version];
|
||||
}
|
||||
|
||||
public nodeStats(): Types.NodeStats {
|
||||
return [this.peers, this.txcount];
|
||||
}
|
||||
|
||||
public blockDetails(): Types.BlockDetails {
|
||||
return [this.height, this.best, this.blockTime, this.blockTimestamp];
|
||||
}
|
||||
|
||||
public get average(): number {
|
||||
let accounted = 0;
|
||||
let sum = 0;
|
||||
|
||||
for (const time of this.blockTimes) {
|
||||
if (time) {
|
||||
accounted += 1;
|
||||
sum += time;
|
||||
}
|
||||
}
|
||||
|
||||
public nodeDetails(): Types.NodeDetails {
|
||||
return [this.name, this.implementation, this.version];
|
||||
if (accounted === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public nodeStats(): Types.NodeStats {
|
||||
return [this.peers, this.txcount];
|
||||
return sum / accounted;
|
||||
}
|
||||
|
||||
public get localBlockAt(): Types.Milliseconds {
|
||||
if (!this.lastBlockAt) {
|
||||
return 0 as Types.Milliseconds;
|
||||
}
|
||||
|
||||
public blockDetails(): Types.BlockDetails {
|
||||
return [this.height, this.best, this.blockTime, this.blockTimestamp];
|
||||
return +(this.lastBlockAt || 0) as Types.Milliseconds;
|
||||
}
|
||||
|
||||
private disconnect() {
|
||||
this.socket.removeAllListeners();
|
||||
this.socket.close();
|
||||
|
||||
this.events.emit('disconnect');
|
||||
}
|
||||
|
||||
private onSystemInterval(message: SystemInterval) {
|
||||
const { peers, txcount } = message;
|
||||
|
||||
if (this.peers !== peers || this.txcount !== txcount) {
|
||||
this.peers = peers;
|
||||
this.txcount = txcount;
|
||||
|
||||
this.events.emit('stats');
|
||||
}
|
||||
}
|
||||
|
||||
private updateLatency(time: Date) {
|
||||
this.latency = (this.lastMessage - +time) as Types.Milliseconds;
|
||||
}
|
||||
|
||||
private updateBestBlock(update: BestBlock) {
|
||||
const { height, ts: time, best } = update;
|
||||
|
||||
if (this.height < height) {
|
||||
const blockTime = this.getBlockTime(time);
|
||||
|
||||
this.best = best;
|
||||
this.height = height;
|
||||
this.blockTimestamp = timestamp();
|
||||
this.lastBlockAt = time;
|
||||
this.blockTimes[height % BLOCK_TIME_HISTORY] = blockTime;
|
||||
this.blockTime = blockTime;
|
||||
|
||||
this.events.emit('block');
|
||||
}
|
||||
}
|
||||
|
||||
private getBlockTime(time: Date): Types.Milliseconds {
|
||||
if (!this.lastBlockAt) {
|
||||
return 0 as Types.Milliseconds;
|
||||
}
|
||||
|
||||
public get average(): number {
|
||||
let accounted = 0;
|
||||
let sum = 0;
|
||||
|
||||
for (const time of this.blockTimes) {
|
||||
if (time) {
|
||||
accounted += 1;
|
||||
sum += time;
|
||||
}
|
||||
}
|
||||
|
||||
if (accounted === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return sum / accounted;
|
||||
}
|
||||
|
||||
public get localBlockAt(): Types.Milliseconds {
|
||||
if (!this.lastBlockAt) {
|
||||
return 0 as Types.Milliseconds;
|
||||
}
|
||||
|
||||
return +(this.lastBlockAt || 0) as Types.Milliseconds;
|
||||
}
|
||||
|
||||
private disconnect() {
|
||||
this.socket.removeAllListeners();
|
||||
this.socket.close();
|
||||
|
||||
this.events.emit('disconnect');
|
||||
}
|
||||
|
||||
private onSystemInterval(message: SystemInterval) {
|
||||
const { peers, txcount } = message;
|
||||
|
||||
if (this.peers !== peers || this.txcount !== txcount) {
|
||||
this.peers = peers;
|
||||
this.txcount = txcount;
|
||||
|
||||
this.events.emit('stats');
|
||||
}
|
||||
}
|
||||
|
||||
private updateLatency(time: Date) {
|
||||
this.latency = (this.lastMessage - +time) as Types.Milliseconds;
|
||||
}
|
||||
|
||||
private updateBestBlock(update: BestBlock) {
|
||||
const { height, ts: time, best } = update;
|
||||
|
||||
if (this.height < height) {
|
||||
const blockTime = this.getBlockTime(time);
|
||||
|
||||
this.best = best;
|
||||
this.height = height;
|
||||
this.blockTimestamp = timestamp();
|
||||
this.lastBlockAt = time;
|
||||
this.blockTimes[height % BLOCK_TIME_HISTORY] = blockTime;
|
||||
this.blockTime = blockTime;
|
||||
|
||||
this.events.emit('block');
|
||||
}
|
||||
}
|
||||
|
||||
private getBlockTime(time: Date): Types.Milliseconds {
|
||||
if (!this.lastBlockAt) {
|
||||
return 0 as Types.Milliseconds;
|
||||
}
|
||||
|
||||
return (+time - +this.lastBlockAt) as Types.Milliseconds;
|
||||
}
|
||||
return (+time - +this.lastBlockAt) as Types.Milliseconds;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,16 +15,16 @@ console.log('Telemetry server listening on port 1024');
|
||||
console.log('Feed server listening on port 8080');
|
||||
|
||||
incomingTelemetry.on('connection', async (socket: WebSocket) => {
|
||||
try {
|
||||
const node = await Node.fromSocket(socket);
|
||||
try {
|
||||
const node = await Node.fromSocket(socket);
|
||||
|
||||
aggregator.addNode(node);
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
}
|
||||
aggregator.addNode(node);
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
}
|
||||
});
|
||||
|
||||
telemetryFeed.on('connection', (socket: WebSocket) => {
|
||||
aggregator.addFeed(new Feed(socket));
|
||||
aggregator.addFeed(new Feed(socket));
|
||||
});
|
||||
|
||||
|
||||
@@ -2,73 +2,73 @@ import { Data } from 'ws';
|
||||
import { Maybe, Types } from '@dotstats/common';
|
||||
|
||||
export function parseMessage(data: Data): Maybe<Message> {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
|
||||
if (message && typeof message.msg === 'string' && typeof message.ts === 'string') {
|
||||
message.ts = new Date(message.ts);
|
||||
if (message && typeof message.msg === 'string' && typeof message.ts === 'string') {
|
||||
message.ts = new Date(message.ts);
|
||||
|
||||
return message;
|
||||
}
|
||||
} catch (_) {
|
||||
console.warn('Error parsing message JSON');
|
||||
return message;
|
||||
}
|
||||
} catch (_) {
|
||||
console.warn('Error parsing message JSON');
|
||||
}
|
||||
|
||||
return null;
|
||||
return null;
|
||||
}
|
||||
|
||||
export function getBestBlock(message: Message): Maybe<BestBlock> {
|
||||
switch (message.msg) {
|
||||
case 'node.start':
|
||||
case 'system.interval':
|
||||
case 'block.import':
|
||||
return message;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
switch (message.msg) {
|
||||
case 'node.start':
|
||||
case 'system.interval':
|
||||
case 'block.import':
|
||||
return message;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
interface MessageBase {
|
||||
ts: Date,
|
||||
level: 'INFO' | 'WARN',
|
||||
ts: Date,
|
||||
level: 'INFO' | 'WARN',
|
||||
}
|
||||
|
||||
export interface BestBlock {
|
||||
best: Types.BlockHash,
|
||||
height: Types.BlockNumber,
|
||||
ts: Date,
|
||||
best: Types.BlockHash,
|
||||
height: Types.BlockNumber,
|
||||
ts: Date,
|
||||
}
|
||||
|
||||
interface SystemConnected {
|
||||
msg: 'system.connected',
|
||||
name: Types.NodeName,
|
||||
chain: Types.ChainLabel,
|
||||
config: string,
|
||||
implementation: Types.NodeImplementation,
|
||||
version: Types.NodeVersion,
|
||||
msg: 'system.connected',
|
||||
name: Types.NodeName,
|
||||
chain: Types.ChainLabel,
|
||||
config: string,
|
||||
implementation: Types.NodeImplementation,
|
||||
version: Types.NodeVersion,
|
||||
}
|
||||
|
||||
export interface SystemInterval extends BestBlock {
|
||||
msg: 'system.interval',
|
||||
txcount: Types.TransactionCount,
|
||||
peers: Types.PeerCount,
|
||||
status: 'Idle' | string, // TODO: 'Idle' | ...?
|
||||
msg: 'system.interval',
|
||||
txcount: Types.TransactionCount,
|
||||
peers: Types.PeerCount,
|
||||
status: 'Idle' | string, // TODO: 'Idle' | ...?
|
||||
}
|
||||
|
||||
interface NodeStart extends BestBlock {
|
||||
msg: 'node.start',
|
||||
msg: 'node.start',
|
||||
}
|
||||
|
||||
interface BlockImport extends BestBlock {
|
||||
msg: 'block.import',
|
||||
msg: 'block.import',
|
||||
}
|
||||
|
||||
// Union type
|
||||
export type Message = MessageBase & (
|
||||
SystemConnected |
|
||||
SystemInterval |
|
||||
NodeStart |
|
||||
BlockImport
|
||||
SystemConnected |
|
||||
SystemInterval |
|
||||
NodeStart |
|
||||
BlockImport
|
||||
);
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user