Node timeout

This commit is contained in:
maciejhirsz
2018-06-26 11:15:59 +02:00
parent d01c88ae16
commit 0b674ad898
3 changed files with 75 additions and 37 deletions
+24 -6
View File
@@ -3,23 +3,41 @@ import Node from './node';
import { NodeId } from './nodeId';
export default class Aggregator extends EventEmitter {
private nodes: Map<NodeId, Node> = new Map;
private _nodes: Map<NodeId, Node> = new Map;
public height: number = 0;
add(node: Node) {
this.nodes.set(node.id, node);
constructor() {
super();
setInterval(() => this.timeoutCheck(), 10000);
}
public add(node: Node) {
this._nodes.set(node.id, node);
node.once('disconnect', () => {
node.removeAllListeners('block');
this.nodes.delete(node.id);
this._nodes.delete(node.id);
});
node.on('block', () => this.updateBlock(node));
}
get nodeList(): Array<Node> {
return Array.from(this.nodes.values());
public get nodes(): IterableIterator<Node> {
return this._nodes.values();
}
public get length(): number {
return this._nodes.size;
}
private timeoutCheck() {
const now = Date.now();
for (const node of this.nodes) {
node.timeoutCheck(now);
}
}
private updateBlock(node: Node) {
+19 -9
View File
@@ -1,18 +1,23 @@
import * as WebSocket from 'ws';
import * as express from 'express';
import { createServer } from 'http';
import Node from './node';
import Aggregator from './aggregator';
const wss = new WebSocket.Server({ port: 1024 });
const aggregator = new Aggregator;
const app = express();
const server = createServer(app);
// WebSocket for Nodes feeding telemetry data to the server
const incomingTelemetry = new WebSocket.Server({ port: 1024 });
// WebSocket for web clients listening to the telemetry data aggregate
const telemetryFeed = new WebSocket.Server({ server });
// respond with "hello world" when a GET request is made to the homepage
app.get('/', function (req, res) {
const nodes = aggregator
.nodeList
.map((node: Node) => `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`);
const nodes = Array
.from(aggregator.nodes)
.map((node: Node) => `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`);
res.send(
@@ -26,9 +31,7 @@ ${nodes.join('\n')}
);
});
app.listen(8080);
wss.on('connection', async (socket: WebSocket) => {
incomingTelemetry.on('connection', async (socket: WebSocket) => {
try {
aggregator.add(await Node.fromSocket(socket));
} catch (err) {
@@ -37,3 +40,10 @@ wss.on('connection', async (socket: WebSocket) => {
return;
}
});
telemetryFeed.on('connection', (socket: WebSocket) => {
socket.send('HELLO THAR!');
socket.close();
});
server.listen(8080);
+32 -22
View File
@@ -5,8 +5,10 @@ import { NodeId, getId } from './nodeId';
import { parseMessage, getBestBlock, Message, BestBlock } from './message';
const BLOCK_TIME_HISTORY = 10;
const TIMEOUT = 1000 * 60 * 5; // 5 seconds
export default class Node extends EventEmitter {
public lastMessage: number;
public id: NodeId;
public name: string;
public implementation: string;
@@ -23,6 +25,7 @@ export default class Node extends EventEmitter {
constructor(socket: WebSocket, name: string, config: string, implentation: string, version: string) {
super();
this.lastMessage = Date.now();
this.id = getId();
this.socket = socket;
this.name = name;
@@ -32,11 +35,12 @@ export default class Node extends EventEmitter {
console.log(`Listening to a new node: ${name}`);
socket.on('message', (data: WebSocket.Data) => {
socket.on('message', (data) => {
const message = parseMessage(data);
if (!message) return;
this.lastMessage = Date.now();
this.updateLatency(message.ts);
const update = getBestBlock(message);
@@ -55,11 +59,11 @@ export default class Node extends EventEmitter {
socket.on('error', (error) => {
console.error(`${this.name} has errored`, error);
this.disconnect()
this.disconnect();
});
}
static fromSocket(socket: WebSocket): Promise<Node> {
public static fromSocket(socket: WebSocket): Promise<Node> {
return new Promise((resolve, reject) => {
function cleanup() {
clearTimeout(timeout);
@@ -90,6 +94,30 @@ export default class Node extends EventEmitter {
});
}
public timeoutCheck(now: number) {
if (this.lastMessage + TIMEOUT < now) {
this.disconnect();
}
}
public get average(): number {
let accounted = 0;
let sum = 0;
for (const time of this.blockTimes) {
if (time) {
accounted += 1;
sum += time;
}
}
if (accounted === 0) {
return 0;
}
return sum / accounted;
}
private disconnect() {
this.socket.removeAllListeners('message');
this.socket.close();
@@ -98,7 +126,7 @@ export default class Node extends EventEmitter {
}
private updateLatency(time: Date) {
this.latency = Date.now() - +time;
this.latency = this.lastMessage - +time;
}
private updateBestBlock(update: BestBlock) {
@@ -123,22 +151,4 @@ export default class Node extends EventEmitter {
return +time - +this.lastBlockAt;
}
get average(): number {
let accounted = 0;
let sum = 0;
for (const time of this.blockTimes) {
if (time) {
accounted += 1;
sum += time;
}
}
if (accounted === 0) {
return 0;
}
return sum / accounted;
}
}