mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-14 07:11:01 +00:00
Tweaking feed messaging
This commit is contained in:
+35
-10
@@ -1,9 +1,11 @@
|
|||||||
import * as EventEmitter from 'events';
|
import * as EventEmitter from 'events';
|
||||||
import Node from './node';
|
import Node from './node';
|
||||||
import { NodeId } from './nodeId';
|
import Feed, { FeedData } from './feed';
|
||||||
|
import { Id, IdSet } from './utils';
|
||||||
|
|
||||||
export default class Aggregator extends EventEmitter {
|
export default class Aggregator extends EventEmitter {
|
||||||
private _nodes: Map<NodeId, Node> = new Map;
|
private nodes: IdSet<Node> = new IdSet<Node>();
|
||||||
|
private feeds: IdSet<Feed> = new IdSet<Feed>();
|
||||||
|
|
||||||
public height: number = 0;
|
public height: number = 0;
|
||||||
|
|
||||||
@@ -13,29 +15,48 @@ export default class Aggregator extends EventEmitter {
|
|||||||
setInterval(() => this.timeoutCheck(), 10000);
|
setInterval(() => this.timeoutCheck(), 10000);
|
||||||
}
|
}
|
||||||
|
|
||||||
public add(node: Node) {
|
public addNode(node: Node) {
|
||||||
this._nodes.set(node.id, node);
|
this.nodes.add(node);
|
||||||
|
this.broadcast(Feed.addedNode(node));
|
||||||
|
|
||||||
node.once('disconnect', () => {
|
node.once('disconnect', () => {
|
||||||
node.removeAllListeners('block');
|
node.removeAllListeners('block');
|
||||||
|
|
||||||
this._nodes.delete(node.id);
|
this.nodes.remove(node);
|
||||||
|
this.broadcast(Feed.removedNode(node));
|
||||||
});
|
});
|
||||||
|
|
||||||
node.on('block', () => this.updateBlock(node));
|
node.on('block', () => this.updateBlock(node));
|
||||||
}
|
}
|
||||||
|
|
||||||
public get nodes(): IterableIterator<Node> {
|
public addFeed(feed: Feed) {
|
||||||
return this._nodes.values();
|
this.feeds.add(feed);
|
||||||
|
|
||||||
|
feed.send(Feed.bestBlock(this.height));
|
||||||
|
|
||||||
|
for (const node of this.nodes.entries) {
|
||||||
|
feed.send(Feed.addedNode(node));
|
||||||
|
}
|
||||||
|
|
||||||
|
feed.once('disconnect', () => {
|
||||||
|
this.feeds.remove(feed);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
public get length(): number {
|
public nodeList(): IterableIterator<Node> {
|
||||||
return this._nodes.size;
|
return this.nodes.entries;
|
||||||
|
}
|
||||||
|
|
||||||
|
private broadcast(data: FeedData) {
|
||||||
|
for (const feed of this.feeds.entries) {
|
||||||
|
feed.send(data);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private timeoutCheck() {
|
private timeoutCheck() {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
|
|
||||||
for (const node of this.nodes) {
|
for (const node of this.nodes.entries) {
|
||||||
node.timeoutCheck(now);
|
node.timeoutCheck(now);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -44,9 +65,13 @@ export default class Aggregator extends EventEmitter {
|
|||||||
if (node.height > this.height) {
|
if (node.height > this.height) {
|
||||||
this.height = node.height;
|
this.height = node.height;
|
||||||
|
|
||||||
|
this.broadcast(Feed.bestBlock(this.height));
|
||||||
|
|
||||||
console.log(`New block ${this.height}`);
|
console.log(`New block ${this.height}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.broadcast(Feed.imported(node));
|
||||||
|
|
||||||
console.log(`${node.name} imported ${node.height}, block time: ${node.blockTime / 1000}s, average: ${node.average / 1000}s | latency ${node.latency}`);
|
console.log(`${node.name} imported ${node.height}, block time: ${node.blockTime / 1000}s, average: ${node.average / 1000}s | latency ${node.latency}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+99
@@ -0,0 +1,99 @@
|
|||||||
|
import * as WebSocket from 'ws';
|
||||||
|
import * as EventEmitter from 'events';
|
||||||
|
import Node, { NodeInfo, BlockInfo } from './node';
|
||||||
|
import { Opaque, Id, idGenerator } from './utils';
|
||||||
|
|
||||||
|
const nextId = idGenerator<Feed>();
|
||||||
|
|
||||||
|
export interface BlockInfo {
|
||||||
|
height: number;
|
||||||
|
blockTime: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface BestBlock {
|
||||||
|
action: 'best';
|
||||||
|
payload: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface AddedNode {
|
||||||
|
action: 'added';
|
||||||
|
payload: [Id<Node>, NodeInfo, BlockInfo];
|
||||||
|
}
|
||||||
|
|
||||||
|
interface RemovedNode {
|
||||||
|
action: 'removed';
|
||||||
|
payload: Id<Node>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Imported {
|
||||||
|
action: 'imported';
|
||||||
|
payload: [Id<Node>, BlockInfo];
|
||||||
|
}
|
||||||
|
|
||||||
|
type Message = BestBlock | AddedNode | RemovedNode | Imported;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Opaque data type to be sent to the feed. Passing through
|
||||||
|
* strings means we can only serialize once, no matter how
|
||||||
|
* many feed clients are listening in.
|
||||||
|
*/
|
||||||
|
export type FeedData = Opaque<string, Message>;
|
||||||
|
|
||||||
|
function serialize(msg: Message): FeedData {
|
||||||
|
return JSON.stringify(msg) as FeedData;
|
||||||
|
}
|
||||||
|
|
||||||
|
export default class Feed extends EventEmitter {
|
||||||
|
public id: Id<Feed>;
|
||||||
|
|
||||||
|
private socket: WebSocket;
|
||||||
|
|
||||||
|
constructor(socket: WebSocket) {
|
||||||
|
super();
|
||||||
|
|
||||||
|
this.id = nextId();
|
||||||
|
this.socket = socket;
|
||||||
|
|
||||||
|
socket.on('error', () => this.disconnect());
|
||||||
|
socket.on('close', () => this.disconnect());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static bestBlock(height: number): FeedData {
|
||||||
|
return serialize({
|
||||||
|
action: 'best',
|
||||||
|
payload: height
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static addedNode(node: Node): FeedData {
|
||||||
|
return serialize({
|
||||||
|
action: 'added',
|
||||||
|
payload: [node.id, node.nodeInfo(), node.blockInfo()]
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
public static removedNode(node: Node): FeedData {
|
||||||
|
return serialize({
|
||||||
|
action: 'removed',
|
||||||
|
payload: node.id
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static imported(node: Node): FeedData {
|
||||||
|
return serialize({
|
||||||
|
action: 'imported',
|
||||||
|
payload: [node.id, node.blockInfo()]
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public send(data: FeedData) {
|
||||||
|
this.socket.send(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
private disconnect() {
|
||||||
|
this.socket.removeAllListeners();
|
||||||
|
this.socket.close();
|
||||||
|
|
||||||
|
this.emit('disconnect');
|
||||||
|
}
|
||||||
|
}
|
||||||
+8
-9
@@ -2,7 +2,9 @@ import * as WebSocket from 'ws';
|
|||||||
import * as express from 'express';
|
import * as express from 'express';
|
||||||
import { createServer } from 'http';
|
import { createServer } from 'http';
|
||||||
import Node from './node';
|
import Node from './node';
|
||||||
|
import Feed from './feed';
|
||||||
import Aggregator from './aggregator';
|
import Aggregator from './aggregator';
|
||||||
|
import { map, join } from './utils';
|
||||||
|
|
||||||
const aggregator = new Aggregator;
|
const aggregator = new Aggregator;
|
||||||
const app = express();
|
const app = express();
|
||||||
@@ -15,9 +17,9 @@ const incomingTelemetry = new WebSocket.Server({ port: 1024 });
|
|||||||
const telemetryFeed = new WebSocket.Server({ server });
|
const telemetryFeed = new WebSocket.Server({ server });
|
||||||
|
|
||||||
app.get('/', function (req, res) {
|
app.get('/', function (req, res) {
|
||||||
const nodes = Array
|
function nodeInfo(node: Node) {
|
||||||
.from(aggregator.nodes)
|
return `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`;
|
||||||
.map((node: Node) => `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`);
|
}
|
||||||
|
|
||||||
res.send(
|
res.send(
|
||||||
|
|
||||||
@@ -25,7 +27,7 @@ app.get('/', function (req, res) {
|
|||||||
Best block: ${aggregator.height}
|
Best block: ${aggregator.height}
|
||||||
|
|
||||||
Node list:
|
Node list:
|
||||||
${nodes.join('\n')}
|
${ join(map(aggregator.nodeList(), nodeInfo), '\n') }
|
||||||
</pre>`
|
</pre>`
|
||||||
|
|
||||||
);
|
);
|
||||||
@@ -33,17 +35,14 @@ ${nodes.join('\n')}
|
|||||||
|
|
||||||
incomingTelemetry.on('connection', async (socket: WebSocket) => {
|
incomingTelemetry.on('connection', async (socket: WebSocket) => {
|
||||||
try {
|
try {
|
||||||
aggregator.add(await Node.fromSocket(socket));
|
aggregator.addNode(await Node.fromSocket(socket));
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(err);
|
console.error(err);
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
telemetryFeed.on('connection', (socket: WebSocket) => {
|
telemetryFeed.on('connection', (socket: WebSocket) => {
|
||||||
socket.send('HELLO THAR!');
|
aggregator.addFeed(new Feed(socket));
|
||||||
socket.close();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
server.listen(8080);
|
server.listen(8080);
|
||||||
|
|||||||
@@ -1 +0,0 @@
|
|||||||
export type Maybe<T> = T | null | undefined;
|
|
||||||
+1
-1
@@ -1,5 +1,5 @@
|
|||||||
import { Data } from 'ws';
|
import { Data } from 'ws';
|
||||||
import { Maybe } from './maybe';
|
import { Maybe, Opaque } from './utils';
|
||||||
|
|
||||||
export function parseMessage(data: Data): Maybe<Message> {
|
export function parseMessage(data: Data): Maybe<Message> {
|
||||||
try {
|
try {
|
||||||
|
|||||||
+30
-5
@@ -1,15 +1,25 @@
|
|||||||
import * as WebSocket from 'ws';
|
import * as WebSocket from 'ws';
|
||||||
import * as EventEmitter from 'events';
|
import * as EventEmitter from 'events';
|
||||||
import { Maybe } from './maybe';
|
import { Maybe, Id, idGenerator } from './utils';
|
||||||
import { NodeId, getId } from './nodeId';
|
|
||||||
import { parseMessage, getBestBlock, Message, BestBlock } from './message';
|
import { parseMessage, getBestBlock, Message, BestBlock } from './message';
|
||||||
|
|
||||||
const BLOCK_TIME_HISTORY = 10;
|
const BLOCK_TIME_HISTORY = 10;
|
||||||
const TIMEOUT = 1000 * 60 * 5; // 5 seconds
|
const TIMEOUT = 1000 * 60 * 5; // 5 seconds
|
||||||
|
|
||||||
|
const nextId = idGenerator<Node>();
|
||||||
|
|
||||||
|
export interface NodeInfo {
|
||||||
|
name: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface BlockInfo {
|
||||||
|
height: number;
|
||||||
|
blockTime: number;
|
||||||
|
}
|
||||||
|
|
||||||
export default class Node extends EventEmitter {
|
export default class Node extends EventEmitter {
|
||||||
public lastMessage: number;
|
public lastMessage: number;
|
||||||
public id: NodeId;
|
public id: Id<Node>;
|
||||||
public name: string;
|
public name: string;
|
||||||
public implementation: string;
|
public implementation: string;
|
||||||
public version: string;
|
public version: string;
|
||||||
@@ -26,7 +36,7 @@ export default class Node extends EventEmitter {
|
|||||||
super();
|
super();
|
||||||
|
|
||||||
this.lastMessage = Date.now();
|
this.lastMessage = Date.now();
|
||||||
this.id = getId();
|
this.id = nextId();
|
||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
@@ -100,6 +110,19 @@ export default class Node extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public nodeInfo(): NodeInfo {
|
||||||
|
return {
|
||||||
|
name: this.name,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public blockInfo(): BlockInfo {
|
||||||
|
return {
|
||||||
|
height: this.height,
|
||||||
|
blockTime: this.blockTime,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
public get average(): number {
|
public get average(): number {
|
||||||
let accounted = 0;
|
let accounted = 0;
|
||||||
let sum = 0;
|
let sum = 0;
|
||||||
@@ -118,8 +141,10 @@ export default class Node extends EventEmitter {
|
|||||||
return sum / accounted;
|
return sum / accounted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private disconnect() {
|
private disconnect() {
|
||||||
this.socket.removeAllListeners('message');
|
this.socket.removeAllListeners();
|
||||||
this.socket.close();
|
this.socket.close();
|
||||||
|
|
||||||
this.emit('disconnect');
|
this.emit('disconnect');
|
||||||
|
|||||||
@@ -1,9 +0,0 @@
|
|||||||
import { Opaque } from './opaque';
|
|
||||||
|
|
||||||
let currentId = 0;
|
|
||||||
|
|
||||||
export type NodeId = Opaque<number, "NodeId">;
|
|
||||||
|
|
||||||
export function getId(): NodeId {
|
|
||||||
return currentId++ as NodeId;
|
|
||||||
}
|
|
||||||
@@ -1,2 +0,0 @@
|
|||||||
// Hack for Opaque Types
|
|
||||||
export type Opaque<T, Label> = T & {__TYPE__: Label};
|
|
||||||
+109
@@ -0,0 +1,109 @@
|
|||||||
|
/**
|
||||||
|
* PhantomData akin to Rust, because sometimes you need to be smarter than
|
||||||
|
* the compiler.
|
||||||
|
*/
|
||||||
|
export class PhantomData<P> { private __PHANTOM__: P }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Opaque type, similar to `opaque type` in Flow, or new types in Rust/C.
|
||||||
|
* These should be produced only by manually casting `t as Opaque<T, P>`.
|
||||||
|
*
|
||||||
|
* `P` can be anything as it's never actually used. Using strings is okay:
|
||||||
|
*
|
||||||
|
* ```
|
||||||
|
* type MyType = Opaque<number, 'MyType'>;
|
||||||
|
* ```
|
||||||
|
*/
|
||||||
|
export type Opaque<T, P> = T & PhantomData<P>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Just a readable shorthand for null-ish-able types, akin to `T?` in Flow.
|
||||||
|
*/
|
||||||
|
export type Maybe<T> = T | null | undefined;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Higher order function producing new auto-incremented `Id`s.
|
||||||
|
*/
|
||||||
|
export function idGenerator<T>(): () => Id<T> {
|
||||||
|
let current = 0;
|
||||||
|
|
||||||
|
return () => current++ as Id<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unique type-constrained Id number.
|
||||||
|
*/
|
||||||
|
export type Id<T> = Opaque<number, T>;
|
||||||
|
|
||||||
|
interface HasId<T> {
|
||||||
|
id: Id<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class IdSet<T> {
|
||||||
|
private map: Map<Id<T>, T> = new Map();
|
||||||
|
|
||||||
|
public add(item: T & HasId<T>) {
|
||||||
|
this.map.set(item.id, item);
|
||||||
|
}
|
||||||
|
|
||||||
|
public remove(item: T & HasId<T>) {
|
||||||
|
this.map.delete(item.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
public get entries(): IterableIterator<T> {
|
||||||
|
return this.map.values();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function* map<T, U>(iter: IterableIterator<T>, fn: (item: T) => U): IterableIterator<U> {
|
||||||
|
for (const item of iter) yield fn(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function* chain<T>(a: IterableIterator<T>, b: IterableIterator<T>): IterableIterator<T> {
|
||||||
|
yield* a;
|
||||||
|
yield* b;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function* zip<T, U>(a: IterableIterator<T>, b: IterableIterator<U>): IterableIterator<[T, U]> {
|
||||||
|
let itemA = a.next();
|
||||||
|
let itemB = b.next();
|
||||||
|
|
||||||
|
while (!itemA.done && !itemB.done) {
|
||||||
|
yield [itemA.value, itemB.value];
|
||||||
|
|
||||||
|
itemA = a.next();
|
||||||
|
itemB = b.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function* take<T>(iter: IterableIterator<T>, n: number): IterableIterator<T> {
|
||||||
|
for (const item of iter) {
|
||||||
|
if (n-- === 0) return;
|
||||||
|
|
||||||
|
yield item;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function skip<T>(iter: IterableIterator<T>, n: number): IterableIterator<T> {
|
||||||
|
while (n-- !== 0 && !iter.next().done) {}
|
||||||
|
|
||||||
|
return iter;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function reduce<T, R>(iter: IterableIterator<T>, fn: (accu: R, item: T) => R, accumulator: R): R {
|
||||||
|
for (const item of iter) accumulator = fn(accumulator, item);
|
||||||
|
|
||||||
|
return accumulator;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function join(iter: IterableIterator<{ toString: () => string }>, glue: string): string {
|
||||||
|
const first = iter.next();
|
||||||
|
|
||||||
|
if (first.done) return '';
|
||||||
|
|
||||||
|
let result = first.value.toString();
|
||||||
|
|
||||||
|
for (const item of iter) result += glue + item;
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user