From 294ff4088237ce6f04aaaa3230c0dae3b788d540 Mon Sep 17 00:00:00 2001 From: maciejhirsz Date: Wed, 20 Jun 2018 15:20:27 +0200 Subject: [PATCH] Aggregate nodes --- src/index.ts | 14 ++++++- src/node.ts | 101 +++++++++++++++++++++++++++++++-------------------- 2 files changed, 74 insertions(+), 41 deletions(-) diff --git a/src/index.ts b/src/index.ts index 2d22256..1dc0aa6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,7 +2,19 @@ import * as WebSocket from 'ws'; import Node from './node'; const wss = new WebSocket.Server({ port: 1024 }); +const nodes = new WeakSet(); wss.on('connection', async (socket: WebSocket) => { - await Node.fromSocket(socket); + let node: Node; + + try { + node = await Node.fromSocket(socket); + } catch (err) { + console.error(err); + + return; + } + + nodes.add(node); + node.once('disconnect', () => nodes.delete(node)); }); diff --git a/src/node.ts b/src/node.ts index f919cf2..5038483 100644 --- a/src/node.ts +++ b/src/node.ts @@ -1,10 +1,12 @@ import * as WebSocket from 'ws'; +import * as EventEmitter from 'events'; import { Maybe } from './maybe'; import { parseMessage, getBestBlock, Message, BestBlock } from './message'; const BLOCK_TIME_HISTORY = 10; -export default class Node { +export default class Node extends EventEmitter { + private id: number; private socket: WebSocket; private name: string; private config: string; @@ -13,8 +15,11 @@ export default class Node { private height: number = 0; private blockTimes: Array = new Array(BLOCK_TIME_HISTORY); private lastBlockTime: Maybe = null; + private latency: number = 0; constructor(socket: WebSocket, name: string, config: string, implentation: string, version: string) { + super(); + this.socket = socket; this.name = name; this.config = config; @@ -28,6 +33,8 @@ export default class Node { if (!message) return; + this.updateLatency(message.ts); + // console.log('received', message); const update = getBestBlock(message); @@ -36,46 +43,9 @@ export default class Node { this.updateBestBlock(update); } }); - } - private updateBestBlock(update: BestBlock) { - const { height, ts: time, best } = update; - - if (this.height < height) { - const blockTime = this.getBlockTime(time); - - this.height = height; - this.lastBlockTime = time; - this.blockTimes[height % BLOCK_TIME_HISTORY] = blockTime; - - console.log(`Best block for ${this.name} is ${this.height}, block time: ${blockTime / 1000}s, average: ${this.average / 1000}s`); - } - } - - private getBlockTime(time: Date): number { - if (!this.lastBlockTime) { - return 0; - } - - return +time - +this.lastBlockTime; - } - - get average(): number { - let accounted = 0; - let sum = 0; - - for (const time of this.blockTimes) { - if (time) { - accounted += 1; - sum += time; - } - } - - if (accounted === 0) { - return 0; - } - - return sum / accounted; + socket.on('close', () => this.disconnect()); + socket.on('error', () => this.disconnect()); } static fromSocket(socket: WebSocket): Promise { @@ -112,4 +82,55 @@ export default class Node { }, 5000); }); } + + private disconnect() { + console.log(`${this.name} has disconnected`); + + this.emit('disconnect'); + } + + private updateLatency(time: Date) { + this.latency = Date.now() - +time; + } + + private updateBestBlock(update: BestBlock) { + const { height, ts: time, best } = update; + + if (this.height < height) { + const blockTime = this.getBlockTime(time); + + this.height = height; + this.lastBlockTime = time; + this.blockTimes[height % BLOCK_TIME_HISTORY] = blockTime; + + console.log(`Best block for ${this.name} is ${this.height}, block time: ${blockTime / 1000}s, average: ${this.average / 1000}s | latency ${this.latency}`); + } + } + + private getBlockTime(time: Date): number { + if (!this.lastBlockTime) { + return 0; + } + + return +time - +this.lastBlockTime; + } + + get average(): number { + let accounted = 0; + let sum = 0; + + for (const time of this.blockTimes) { + if (time) { + accounted += 1; + sum += time; + } + } + + if (accounted === 0) { + return 0; + } + + return sum / accounted; + } + }