From 0b674ad8986ddfb6f72f1b8f9974840b7c918679 Mon Sep 17 00:00:00 2001 From: maciejhirsz Date: Tue, 26 Jun 2018 11:15:59 +0200 Subject: [PATCH] Node timeout --- src/aggregator.ts | 30 ++++++++++++++++++++------ src/index.ts | 28 ++++++++++++++++-------- src/node.ts | 54 ++++++++++++++++++++++++++++------------------- 3 files changed, 75 insertions(+), 37 deletions(-) diff --git a/src/aggregator.ts b/src/aggregator.ts index 8e0d47c..6951a80 100644 --- a/src/aggregator.ts +++ b/src/aggregator.ts @@ -3,23 +3,41 @@ import Node from './node'; import { NodeId } from './nodeId'; export default class Aggregator extends EventEmitter { - private nodes: Map = new Map; + private _nodes: Map = new Map; public height: number = 0; - add(node: Node) { - this.nodes.set(node.id, node); + constructor() { + super(); + + setInterval(() => this.timeoutCheck(), 10000); + } + + public add(node: Node) { + this._nodes.set(node.id, node); node.once('disconnect', () => { node.removeAllListeners('block'); - this.nodes.delete(node.id); + this._nodes.delete(node.id); }); node.on('block', () => this.updateBlock(node)); } - get nodeList(): Array { - return Array.from(this.nodes.values()); + public get nodes(): IterableIterator { + return this._nodes.values(); + } + + public get length(): number { + return this._nodes.size; + } + + private timeoutCheck() { + const now = Date.now(); + + for (const node of this.nodes) { + node.timeoutCheck(now); + } } private updateBlock(node: Node) { diff --git a/src/index.ts b/src/index.ts index 297773c..dad1898 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,18 +1,23 @@ import * as WebSocket from 'ws'; import * as express from 'express'; +import { createServer } from 'http'; import Node from './node'; import Aggregator from './aggregator'; -const wss = new WebSocket.Server({ port: 1024 }); - const aggregator = new Aggregator; const app = express(); +const server = createServer(app); + +// WebSocket for Nodes feeding telemetry data to the server +const incomingTelemetry = new WebSocket.Server({ port: 1024 }); + +// WebSocket for web clients listening to the telemetry data aggregate +const telemetryFeed = new WebSocket.Server({ server }); -// respond with "hello world" when a GET request is made to the homepage app.get('/', function (req, res) { - const nodes = aggregator - .nodeList - .map((node: Node) => `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`); + const nodes = Array + .from(aggregator.nodes) + .map((node: Node) => `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`); res.send( @@ -26,9 +31,7 @@ ${nodes.join('\n')} ); }); -app.listen(8080); - -wss.on('connection', async (socket: WebSocket) => { +incomingTelemetry.on('connection', async (socket: WebSocket) => { try { aggregator.add(await Node.fromSocket(socket)); } catch (err) { @@ -37,3 +40,10 @@ wss.on('connection', async (socket: WebSocket) => { return; } }); + +telemetryFeed.on('connection', (socket: WebSocket) => { + socket.send('HELLO THAR!'); + socket.close(); +}); + +server.listen(8080); diff --git a/src/node.ts b/src/node.ts index 9d36df6..976b5b2 100644 --- a/src/node.ts +++ b/src/node.ts @@ -5,8 +5,10 @@ import { NodeId, getId } from './nodeId'; import { parseMessage, getBestBlock, Message, BestBlock } from './message'; const BLOCK_TIME_HISTORY = 10; +const TIMEOUT = 1000 * 60 * 5; // 5 seconds export default class Node extends EventEmitter { + public lastMessage: number; public id: NodeId; public name: string; public implementation: string; @@ -23,6 +25,7 @@ export default class Node extends EventEmitter { constructor(socket: WebSocket, name: string, config: string, implentation: string, version: string) { super(); + this.lastMessage = Date.now(); this.id = getId(); this.socket = socket; this.name = name; @@ -32,11 +35,12 @@ export default class Node extends EventEmitter { console.log(`Listening to a new node: ${name}`); - socket.on('message', (data: WebSocket.Data) => { + socket.on('message', (data) => { const message = parseMessage(data); if (!message) return; + this.lastMessage = Date.now(); this.updateLatency(message.ts); const update = getBestBlock(message); @@ -55,11 +59,11 @@ export default class Node extends EventEmitter { socket.on('error', (error) => { console.error(`${this.name} has errored`, error); - this.disconnect() + this.disconnect(); }); } - static fromSocket(socket: WebSocket): Promise { + public static fromSocket(socket: WebSocket): Promise { return new Promise((resolve, reject) => { function cleanup() { clearTimeout(timeout); @@ -90,6 +94,30 @@ export default class Node extends EventEmitter { }); } + public timeoutCheck(now: number) { + if (this.lastMessage + TIMEOUT < now) { + this.disconnect(); + } + } + + public get average(): number { + let accounted = 0; + let sum = 0; + + for (const time of this.blockTimes) { + if (time) { + accounted += 1; + sum += time; + } + } + + if (accounted === 0) { + return 0; + } + + return sum / accounted; + } + private disconnect() { this.socket.removeAllListeners('message'); this.socket.close(); @@ -98,7 +126,7 @@ export default class Node extends EventEmitter { } private updateLatency(time: Date) { - this.latency = Date.now() - +time; + this.latency = this.lastMessage - +time; } private updateBestBlock(update: BestBlock) { @@ -123,22 +151,4 @@ export default class Node extends EventEmitter { return +time - +this.lastBlockAt; } - - get average(): number { - let accounted = 0; - let sum = 0; - - for (const time of this.blockTimes) { - if (time) { - accounted += 1; - sum += time; - } - } - - if (accounted === 0) { - return 0; - } - - return sum / accounted; - } }