mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-12 23:51:02 +00:00
Reorganized repo using yarn and workspaces
This commit is contained in:
@@ -0,0 +1,77 @@
|
||||
import * as EventEmitter from 'events';
|
||||
import Node from './node';
|
||||
import Feed, { FeedData } from './feed';
|
||||
import { Id, IdSet } from '@dotstats/common';
|
||||
|
||||
export default class Aggregator extends EventEmitter {
|
||||
private nodes: IdSet<Node> = new IdSet<Node>();
|
||||
private feeds: IdSet<Feed> = new IdSet<Feed>();
|
||||
|
||||
public height: number = 0;
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
|
||||
setInterval(() => this.timeoutCheck(), 10000);
|
||||
}
|
||||
|
||||
public addNode(node: Node) {
|
||||
this.nodes.add(node);
|
||||
this.broadcast(Feed.addedNode(node));
|
||||
|
||||
node.once('disconnect', () => {
|
||||
node.removeAllListeners('block');
|
||||
|
||||
this.nodes.remove(node);
|
||||
this.broadcast(Feed.removedNode(node));
|
||||
});
|
||||
|
||||
node.on('block', () => this.updateBlock(node));
|
||||
}
|
||||
|
||||
public addFeed(feed: Feed) {
|
||||
this.feeds.add(feed);
|
||||
|
||||
feed.send(Feed.bestBlock(this.height));
|
||||
|
||||
for (const node of this.nodes.entries) {
|
||||
feed.send(Feed.addedNode(node));
|
||||
}
|
||||
|
||||
feed.once('disconnect', () => {
|
||||
this.feeds.remove(feed);
|
||||
})
|
||||
}
|
||||
|
||||
public nodeList(): IterableIterator<Node> {
|
||||
return this.nodes.entries;
|
||||
}
|
||||
|
||||
private broadcast(data: FeedData) {
|
||||
for (const feed of this.feeds.entries) {
|
||||
feed.send(data);
|
||||
}
|
||||
}
|
||||
|
||||
private timeoutCheck() {
|
||||
const now = Date.now();
|
||||
|
||||
for (const node of this.nodes.entries) {
|
||||
node.timeoutCheck(now);
|
||||
}
|
||||
}
|
||||
|
||||
private updateBlock(node: Node) {
|
||||
if (node.height > this.height) {
|
||||
this.height = node.height;
|
||||
|
||||
this.broadcast(Feed.bestBlock(this.height));
|
||||
|
||||
console.log(`New block ${this.height}`);
|
||||
}
|
||||
|
||||
this.broadcast(Feed.imported(node));
|
||||
|
||||
console.log(`${node.name} imported ${node.height}, block time: ${node.blockTime / 1000}s, average: ${node.average / 1000}s | latency ${node.latency}`);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
import * as WebSocket from 'ws';
|
||||
import * as EventEmitter from 'events';
|
||||
import Node, { NodeInfo, BlockInfo } from './node';
|
||||
import { Opaque, Id, idGenerator } from '@dotstats/common';
|
||||
|
||||
const nextId = idGenerator<Feed>();
|
||||
|
||||
export interface BlockInfo {
|
||||
height: number;
|
||||
blockTime: number;
|
||||
}
|
||||
|
||||
interface BestBlock {
|
||||
action: 'best';
|
||||
payload: number;
|
||||
}
|
||||
|
||||
interface AddedNode {
|
||||
action: 'added';
|
||||
payload: [Id<Node>, NodeInfo, BlockInfo];
|
||||
}
|
||||
|
||||
interface RemovedNode {
|
||||
action: 'removed';
|
||||
payload: Id<Node>;
|
||||
}
|
||||
|
||||
interface Imported {
|
||||
action: 'imported';
|
||||
payload: [Id<Node>, BlockInfo];
|
||||
}
|
||||
|
||||
type Message = BestBlock | AddedNode | RemovedNode | Imported;
|
||||
|
||||
/**
|
||||
* Opaque data type to be sent to the feed. Passing through
|
||||
* strings means we can only serialize once, no matter how
|
||||
* many feed clients are listening in.
|
||||
*/
|
||||
export type FeedData = Opaque<string, Message>;
|
||||
|
||||
function serialize(msg: Message): FeedData {
|
||||
return JSON.stringify(msg) as FeedData;
|
||||
}
|
||||
|
||||
export default class Feed extends EventEmitter {
|
||||
public id: Id<Feed>;
|
||||
|
||||
private socket: WebSocket;
|
||||
|
||||
constructor(socket: WebSocket) {
|
||||
super();
|
||||
|
||||
this.id = nextId();
|
||||
this.socket = socket;
|
||||
|
||||
socket.on('error', () => this.disconnect());
|
||||
socket.on('close', () => this.disconnect());
|
||||
}
|
||||
|
||||
public static bestBlock(height: number): FeedData {
|
||||
return serialize({
|
||||
action: 'best',
|
||||
payload: height
|
||||
});
|
||||
}
|
||||
|
||||
public static addedNode(node: Node): FeedData {
|
||||
return serialize({
|
||||
action: 'added',
|
||||
payload: [node.id, node.nodeInfo(), node.blockInfo()]
|
||||
})
|
||||
}
|
||||
|
||||
public static removedNode(node: Node): FeedData {
|
||||
return serialize({
|
||||
action: 'removed',
|
||||
payload: node.id
|
||||
});
|
||||
}
|
||||
|
||||
public static imported(node: Node): FeedData {
|
||||
return serialize({
|
||||
action: 'imported',
|
||||
payload: [node.id, node.blockInfo()]
|
||||
});
|
||||
}
|
||||
|
||||
public send(data: FeedData) {
|
||||
this.socket.send(data);
|
||||
}
|
||||
|
||||
private disconnect() {
|
||||
this.socket.removeAllListeners();
|
||||
this.socket.close();
|
||||
|
||||
this.emit('disconnect');
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
import * as WebSocket from 'ws';
|
||||
import * as express from 'express';
|
||||
import { createServer } from 'http';
|
||||
import Node from './node';
|
||||
import Feed from './feed';
|
||||
import Aggregator from './aggregator';
|
||||
import { map, join } from '@dotstats/common';
|
||||
|
||||
const aggregator = new Aggregator;
|
||||
const app = express();
|
||||
const server = createServer(app);
|
||||
|
||||
// WebSocket for Nodes feeding telemetry data to the server
|
||||
const incomingTelemetry = new WebSocket.Server({ port: 1024 });
|
||||
|
||||
// WebSocket for web clients listening to the telemetry data aggregate
|
||||
const telemetryFeed = new WebSocket.Server({ server });
|
||||
|
||||
app.get('/', function (req, res) {
|
||||
function nodeInfo(node: Node) {
|
||||
return `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`;
|
||||
}
|
||||
|
||||
res.send(
|
||||
|
||||
`<pre>
|
||||
Best block: ${aggregator.height}
|
||||
|
||||
Node list:
|
||||
${ join(map(aggregator.nodeList(), nodeInfo), '\n') }
|
||||
</pre>`
|
||||
|
||||
);
|
||||
});
|
||||
|
||||
incomingTelemetry.on('connection', async (socket: WebSocket) => {
|
||||
try {
|
||||
aggregator.addNode(await Node.fromSocket(socket));
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
}
|
||||
});
|
||||
|
||||
telemetryFeed.on('connection', (socket: WebSocket) => {
|
||||
aggregator.addFeed(new Feed(socket));
|
||||
});
|
||||
|
||||
console.log('Starting server on port 8080');
|
||||
server.listen(8080);
|
||||
@@ -0,0 +1,78 @@
|
||||
import { Data } from 'ws';
|
||||
import { Maybe, Opaque } from '@dotstats/common';
|
||||
|
||||
export function parseMessage(data: Data): Maybe<Message> {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
|
||||
if (message && typeof message.msg === 'string' && typeof message.ts === 'string') {
|
||||
message.ts = new Date(message.ts);
|
||||
|
||||
return message;
|
||||
}
|
||||
} catch (_) {
|
||||
console.warn('Error parsing message JSON');
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
export function getBestBlock(message: Message): Maybe<BestBlock> {
|
||||
switch (message.msg) {
|
||||
case 'node.start':
|
||||
case 'system.interval':
|
||||
case 'block.import':
|
||||
return message;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
interface MessageBase {
|
||||
ts: Date,
|
||||
level: 'INFO' | 'WARN',
|
||||
}
|
||||
|
||||
export interface BestBlock {
|
||||
best: string,
|
||||
height: number,
|
||||
ts: Date,
|
||||
}
|
||||
|
||||
interface SystemConnected {
|
||||
msg: 'system.connected',
|
||||
name: string,
|
||||
chain: string,
|
||||
config: string,
|
||||
implementation: string,
|
||||
version: string,
|
||||
}
|
||||
|
||||
interface SystemInterval extends BestBlock {
|
||||
msg: 'system.interval',
|
||||
txcount: number,
|
||||
peers: number,
|
||||
status: 'Idle' | string, // TODO: 'Idle' | ...?
|
||||
}
|
||||
|
||||
interface NodeStart extends BestBlock {
|
||||
msg: 'node.start',
|
||||
}
|
||||
|
||||
interface BlockImport extends BestBlock {
|
||||
msg: 'block.import',
|
||||
}
|
||||
|
||||
// Union type
|
||||
export type Message = MessageBase & (
|
||||
SystemConnected |
|
||||
SystemInterval |
|
||||
NodeStart |
|
||||
BlockImport
|
||||
);
|
||||
|
||||
|
||||
// received: {"msg":"block.import","level":"INFO","ts":"2018-06-18T17:30:35.285406538+02:00","best":"3d4fdc7960078ddc9be87dddc48324a6d64afdf1f65fffe89529ce9965cd5f29","height":526}
|
||||
// received: {"msg":"node.start","level":"INFO","ts":"2018-06-18T17:30:40.038731057+02:00","best":"3d4fdc7960078ddc9be87dddc48324a6d64afdf1f65fffe89529ce9965cd5f29","height":526}
|
||||
// received: {"msg":"system.connected","level":"INFO","ts":"2018-06-18T17:30:40.038975471+02:00","chain":"dev","config":"","version":"0.2.0","implementation":"parity-polkadot","name":"Majestic Widget"}
|
||||
// received: {"msg":"system.interval","level":"INFO","ts":"2018-06-19T14:00:05.091355364+02:00","txcount":0,"best":"360c9563857308703398f637932b7ffe884e5c7b09692600ff09a4d753c9d948","height":7559,"peers":0,"status":"Idle"}
|
||||
@@ -0,0 +1,179 @@
|
||||
import * as WebSocket from 'ws';
|
||||
import * as EventEmitter from 'events';
|
||||
import { Maybe, Id, idGenerator } from '@dotstats/common';
|
||||
import { parseMessage, getBestBlock, Message, BestBlock } from './message';
|
||||
|
||||
const BLOCK_TIME_HISTORY = 10;
|
||||
const TIMEOUT = 1000 * 60 * 5; // 5 seconds
|
||||
|
||||
const nextId = idGenerator<Node>();
|
||||
|
||||
export interface NodeInfo {
|
||||
name: string;
|
||||
}
|
||||
|
||||
export interface BlockInfo {
|
||||
height: number;
|
||||
blockTime: number;
|
||||
}
|
||||
|
||||
export default class Node extends EventEmitter {
|
||||
public lastMessage: number;
|
||||
public id: Id<Node>;
|
||||
public name: string;
|
||||
public implementation: string;
|
||||
public version: string;
|
||||
public height: number = 0;
|
||||
public config: string;
|
||||
public latency: number = 0;
|
||||
public blockTime: number = 0;
|
||||
|
||||
private socket: WebSocket;
|
||||
private blockTimes: Array<number> = new Array(BLOCK_TIME_HISTORY);
|
||||
private lastBlockAt: Maybe<Date> = null;
|
||||
|
||||
constructor(socket: WebSocket, name: string, config: string, implentation: string, version: string) {
|
||||
super();
|
||||
|
||||
this.lastMessage = Date.now();
|
||||
this.id = nextId();
|
||||
this.socket = socket;
|
||||
this.name = name;
|
||||
this.config = config;
|
||||
this.implementation = implentation;
|
||||
this.version = version;
|
||||
|
||||
console.log(`Listening to a new node: ${name}`);
|
||||
|
||||
socket.on('message', (data) => {
|
||||
const message = parseMessage(data);
|
||||
|
||||
if (!message) return;
|
||||
|
||||
this.lastMessage = Date.now();
|
||||
this.updateLatency(message.ts);
|
||||
|
||||
const update = getBestBlock(message);
|
||||
|
||||
if (update) {
|
||||
this.updateBestBlock(update);
|
||||
}
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
console.log(`${this.name} has disconnected`);
|
||||
|
||||
this.disconnect();
|
||||
});
|
||||
|
||||
socket.on('error', (error) => {
|
||||
console.error(`${this.name} has errored`, error);
|
||||
|
||||
this.disconnect();
|
||||
});
|
||||
}
|
||||
|
||||
public static fromSocket(socket: WebSocket): Promise<Node> {
|
||||
return new Promise((resolve, reject) => {
|
||||
function cleanup() {
|
||||
clearTimeout(timeout);
|
||||
socket.removeAllListeners('message');
|
||||
}
|
||||
|
||||
function handler(data: WebSocket.Data) {
|
||||
const message = parseMessage(data);
|
||||
|
||||
if (message && message.msg === "system.connected") {
|
||||
cleanup();
|
||||
|
||||
const { name, config, implementation, version } = message;
|
||||
|
||||
resolve(new Node(socket, name, config, implementation, version));
|
||||
}
|
||||
}
|
||||
|
||||
socket.on('message', handler);
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
cleanup();
|
||||
|
||||
socket.close();
|
||||
|
||||
return reject(new Error('Timeout on waiting for system.connected message'));
|
||||
}, 5000);
|
||||
});
|
||||
}
|
||||
|
||||
public timeoutCheck(now: number) {
|
||||
if (this.lastMessage + TIMEOUT < now) {
|
||||
this.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
public nodeInfo(): NodeInfo {
|
||||
return {
|
||||
name: this.name,
|
||||
};
|
||||
}
|
||||
|
||||
public blockInfo(): BlockInfo {
|
||||
return {
|
||||
height: this.height,
|
||||
blockTime: this.blockTime,
|
||||
};
|
||||
}
|
||||
|
||||
public get average(): number {
|
||||
let accounted = 0;
|
||||
let sum = 0;
|
||||
|
||||
for (const time of this.blockTimes) {
|
||||
if (time) {
|
||||
accounted += 1;
|
||||
sum += time;
|
||||
}
|
||||
}
|
||||
|
||||
if (accounted === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return sum / accounted;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private disconnect() {
|
||||
this.socket.removeAllListeners();
|
||||
this.socket.close();
|
||||
|
||||
this.emit('disconnect');
|
||||
}
|
||||
|
||||
private updateLatency(time: Date) {
|
||||
this.latency = this.lastMessage - +time;
|
||||
}
|
||||
|
||||
private updateBestBlock(update: BestBlock) {
|
||||
const { height, ts: time, best } = update;
|
||||
|
||||
if (this.height < height) {
|
||||
const blockTime = this.getBlockTime(time);
|
||||
|
||||
this.height = height;
|
||||
this.lastBlockAt = time;
|
||||
this.blockTimes[height % BLOCK_TIME_HISTORY] = blockTime;
|
||||
this.blockTime = blockTime;
|
||||
|
||||
this.emit('block');
|
||||
}
|
||||
}
|
||||
|
||||
private getBlockTime(time: Date): number {
|
||||
if (!this.lastBlockAt) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return +time - +this.lastBlockAt;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user