diff --git a/src/aggregator.ts b/src/aggregator.ts index 6951a80..cae0926 100644 --- a/src/aggregator.ts +++ b/src/aggregator.ts @@ -1,9 +1,11 @@ import * as EventEmitter from 'events'; import Node from './node'; -import { NodeId } from './nodeId'; +import Feed, { FeedData } from './feed'; +import { Id, IdSet } from './utils'; export default class Aggregator extends EventEmitter { - private _nodes: Map = new Map; + private nodes: IdSet = new IdSet(); + private feeds: IdSet = new IdSet(); public height: number = 0; @@ -13,29 +15,48 @@ export default class Aggregator extends EventEmitter { setInterval(() => this.timeoutCheck(), 10000); } - public add(node: Node) { - this._nodes.set(node.id, node); + public addNode(node: Node) { + this.nodes.add(node); + this.broadcast(Feed.addedNode(node)); + node.once('disconnect', () => { node.removeAllListeners('block'); - this._nodes.delete(node.id); + this.nodes.remove(node); + this.broadcast(Feed.removedNode(node)); }); node.on('block', () => this.updateBlock(node)); } - public get nodes(): IterableIterator { - return this._nodes.values(); + public addFeed(feed: Feed) { + this.feeds.add(feed); + + feed.send(Feed.bestBlock(this.height)); + + for (const node of this.nodes.entries) { + feed.send(Feed.addedNode(node)); + } + + feed.once('disconnect', () => { + this.feeds.remove(feed); + }) } - public get length(): number { - return this._nodes.size; + public nodeList(): IterableIterator { + return this.nodes.entries; + } + + private broadcast(data: FeedData) { + for (const feed of this.feeds.entries) { + feed.send(data); + } } private timeoutCheck() { const now = Date.now(); - for (const node of this.nodes) { + for (const node of this.nodes.entries) { node.timeoutCheck(now); } } @@ -44,9 +65,13 @@ export default class Aggregator extends EventEmitter { if (node.height > this.height) { this.height = node.height; + this.broadcast(Feed.bestBlock(this.height)); + console.log(`New block ${this.height}`); } + this.broadcast(Feed.imported(node)); + console.log(`${node.name} imported ${node.height}, block time: ${node.blockTime / 1000}s, average: ${node.average / 1000}s | latency ${node.latency}`); } } diff --git a/src/feed.ts b/src/feed.ts new file mode 100644 index 0000000..eab16f6 --- /dev/null +++ b/src/feed.ts @@ -0,0 +1,99 @@ +import * as WebSocket from 'ws'; +import * as EventEmitter from 'events'; +import Node, { NodeInfo, BlockInfo } from './node'; +import { Opaque, Id, idGenerator } from './utils'; + +const nextId = idGenerator(); + +export interface BlockInfo { + height: number; + blockTime: number; +} + +interface BestBlock { + action: 'best'; + payload: number; +} + +interface AddedNode { + action: 'added'; + payload: [Id, NodeInfo, BlockInfo]; +} + +interface RemovedNode { + action: 'removed'; + payload: Id; +} + +interface Imported { + action: 'imported'; + payload: [Id, BlockInfo]; +} + +type Message = BestBlock | AddedNode | RemovedNode | Imported; + +/** + * Opaque data type to be sent to the feed. Passing through + * strings means we can only serialize once, no matter how + * many feed clients are listening in. + */ +export type FeedData = Opaque; + +function serialize(msg: Message): FeedData { + return JSON.stringify(msg) as FeedData; +} + +export default class Feed extends EventEmitter { + public id: Id; + + private socket: WebSocket; + + constructor(socket: WebSocket) { + super(); + + this.id = nextId(); + this.socket = socket; + + socket.on('error', () => this.disconnect()); + socket.on('close', () => this.disconnect()); + } + + public static bestBlock(height: number): FeedData { + return serialize({ + action: 'best', + payload: height + }); + } + + public static addedNode(node: Node): FeedData { + return serialize({ + action: 'added', + payload: [node.id, node.nodeInfo(), node.blockInfo()] + }) + } + + public static removedNode(node: Node): FeedData { + return serialize({ + action: 'removed', + payload: node.id + }); + } + + public static imported(node: Node): FeedData { + return serialize({ + action: 'imported', + payload: [node.id, node.blockInfo()] + }); + } + + public send(data: FeedData) { + this.socket.send(data); + } + + private disconnect() { + this.socket.removeAllListeners(); + this.socket.close(); + + this.emit('disconnect'); + } +} diff --git a/src/index.ts b/src/index.ts index dad1898..c46c1ca 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,7 +2,9 @@ import * as WebSocket from 'ws'; import * as express from 'express'; import { createServer } from 'http'; import Node from './node'; +import Feed from './feed'; import Aggregator from './aggregator'; +import { map, join } from './utils'; const aggregator = new Aggregator; const app = express(); @@ -15,9 +17,9 @@ const incomingTelemetry = new WebSocket.Server({ port: 1024 }); const telemetryFeed = new WebSocket.Server({ server }); app.get('/', function (req, res) { - const nodes = Array - .from(aggregator.nodes) - .map((node: Node) => `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`); + function nodeInfo(node: Node) { + return `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`; + } res.send( @@ -25,7 +27,7 @@ app.get('/', function (req, res) { Best block: ${aggregator.height} Node list: -${nodes.join('\n')} +${ join(map(aggregator.nodeList(), nodeInfo), '\n') } ` ); @@ -33,17 +35,14 @@ ${nodes.join('\n')} incomingTelemetry.on('connection', async (socket: WebSocket) => { try { - aggregator.add(await Node.fromSocket(socket)); + aggregator.addNode(await Node.fromSocket(socket)); } catch (err) { console.error(err); - - return; } }); telemetryFeed.on('connection', (socket: WebSocket) => { - socket.send('HELLO THAR!'); - socket.close(); + aggregator.addFeed(new Feed(socket)); }); server.listen(8080); diff --git a/src/maybe.ts b/src/maybe.ts deleted file mode 100644 index fc44aea..0000000 --- a/src/maybe.ts +++ /dev/null @@ -1 +0,0 @@ -export type Maybe = T | null | undefined; diff --git a/src/message.ts b/src/message.ts index a40bd95..d0cdf8d 100644 --- a/src/message.ts +++ b/src/message.ts @@ -1,5 +1,5 @@ import { Data } from 'ws'; -import { Maybe } from './maybe'; +import { Maybe, Opaque } from './utils'; export function parseMessage(data: Data): Maybe { try { diff --git a/src/node.ts b/src/node.ts index 976b5b2..6a2e45d 100644 --- a/src/node.ts +++ b/src/node.ts @@ -1,15 +1,25 @@ import * as WebSocket from 'ws'; import * as EventEmitter from 'events'; -import { Maybe } from './maybe'; -import { NodeId, getId } from './nodeId'; +import { Maybe, Id, idGenerator } from './utils'; import { parseMessage, getBestBlock, Message, BestBlock } from './message'; const BLOCK_TIME_HISTORY = 10; const TIMEOUT = 1000 * 60 * 5; // 5 seconds +const nextId = idGenerator(); + +export interface NodeInfo { + name: string; +} + +export interface BlockInfo { + height: number; + blockTime: number; +} + export default class Node extends EventEmitter { public lastMessage: number; - public id: NodeId; + public id: Id; public name: string; public implementation: string; public version: string; @@ -26,7 +36,7 @@ export default class Node extends EventEmitter { super(); this.lastMessage = Date.now(); - this.id = getId(); + this.id = nextId(); this.socket = socket; this.name = name; this.config = config; @@ -100,6 +110,19 @@ export default class Node extends EventEmitter { } } + public nodeInfo(): NodeInfo { + return { + name: this.name, + }; + } + + public blockInfo(): BlockInfo { + return { + height: this.height, + blockTime: this.blockTime, + }; + } + public get average(): number { let accounted = 0; let sum = 0; @@ -118,8 +141,10 @@ export default class Node extends EventEmitter { return sum / accounted; } + + private disconnect() { - this.socket.removeAllListeners('message'); + this.socket.removeAllListeners(); this.socket.close(); this.emit('disconnect'); diff --git a/src/nodeId.ts b/src/nodeId.ts deleted file mode 100644 index b8c3b20..0000000 --- a/src/nodeId.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { Opaque } from './opaque'; - -let currentId = 0; - -export type NodeId = Opaque; - -export function getId(): NodeId { - return currentId++ as NodeId; -} diff --git a/src/opaque.ts b/src/opaque.ts deleted file mode 100644 index 71a4e20..0000000 --- a/src/opaque.ts +++ /dev/null @@ -1,2 +0,0 @@ -// Hack for Opaque Types -export type Opaque = T & {__TYPE__: Label}; diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..3933796 --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,109 @@ +/** + * PhantomData akin to Rust, because sometimes you need to be smarter than + * the compiler. + */ +export class PhantomData

{ private __PHANTOM__: P } + +/** + * Opaque type, similar to `opaque type` in Flow, or new types in Rust/C. + * These should be produced only by manually casting `t as Opaque`. + * + * `P` can be anything as it's never actually used. Using strings is okay: + * + * ``` + * type MyType = Opaque; + * ``` + */ +export type Opaque = T & PhantomData

; + +/** + * Just a readable shorthand for null-ish-able types, akin to `T?` in Flow. + */ +export type Maybe = T | null | undefined; + +/** + * Higher order function producing new auto-incremented `Id`s. + */ +export function idGenerator(): () => Id { + let current = 0; + + return () => current++ as Id; +} + +/** + * Unique type-constrained Id number. + */ +export type Id = Opaque; + +interface HasId { + id: Id; +} + +export class IdSet { + private map: Map, T> = new Map(); + + public add(item: T & HasId) { + this.map.set(item.id, item); + } + + public remove(item: T & HasId) { + this.map.delete(item.id); + } + + public get entries(): IterableIterator { + return this.map.values(); + } +} + +export function* map(iter: IterableIterator, fn: (item: T) => U): IterableIterator { + for (const item of iter) yield fn(item); +} + +export function* chain(a: IterableIterator, b: IterableIterator): IterableIterator { + yield* a; + yield* b; +} + +export function* zip(a: IterableIterator, b: IterableIterator): IterableIterator<[T, U]> { + let itemA = a.next(); + let itemB = b.next(); + + while (!itemA.done && !itemB.done) { + yield [itemA.value, itemB.value]; + + itemA = a.next(); + itemB = b.next(); + } +} + +export function* take(iter: IterableIterator, n: number): IterableIterator { + for (const item of iter) { + if (n-- === 0) return; + + yield item; + } +} + +export function skip(iter: IterableIterator, n: number): IterableIterator { + while (n-- !== 0 && !iter.next().done) {} + + return iter; +} + +export function reduce(iter: IterableIterator, fn: (accu: R, item: T) => R, accumulator: R): R { + for (const item of iter) accumulator = fn(accumulator, item); + + return accumulator; +} + +export function join(iter: IterableIterator<{ toString: () => string }>, glue: string): string { + const first = iter.next(); + + if (first.done) return ''; + + let result = first.value.toString(); + + for (const item of iter) result += glue + item; + + return result; +}