From 280e0f5c9c30a43e9cda455f7b9b9c080f66b6dc Mon Sep 17 00:00:00 2001 From: maciejhirsz Date: Thu, 19 Jul 2018 15:23:10 +0200 Subject: [PATCH] Send ping to the node --- packages/backend/src/Aggregator.ts | 6 ++++-- packages/backend/src/Chain.ts | 6 ++---- packages/backend/src/Node.ts | 28 +++++++++++++++++++++++----- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/packages/backend/src/Aggregator.ts b/packages/backend/src/Aggregator.ts index 2c879ba..d769b69 100644 --- a/packages/backend/src/Aggregator.ts +++ b/packages/backend/src/Aggregator.ts @@ -2,7 +2,7 @@ import Chain from './Chain'; import Node from './Node'; import Feed from './Feed'; import FeedSet from './FeedSet'; -import { Types, FeedMessage } from '@dotstats/common'; +import { Types, FeedMessage, timestamp } from '@dotstats/common'; export default class Aggregator { private readonly chains = new Map(); @@ -83,8 +83,10 @@ export default class Aggregator { private timeoutCheck() { const empty: Types.ChainLabel[] = []; + const now = timestamp(); + for (const chain of this.chains.values()) { - chain.timeoutCheck(); + chain.timeoutCheck(now); } } } diff --git a/packages/backend/src/Chain.ts b/packages/backend/src/Chain.ts index 9863da8..26ae587 100644 --- a/packages/backend/src/Chain.ts +++ b/packages/backend/src/Chain.ts @@ -2,7 +2,7 @@ import * as EventEmitter from 'events'; import Node from './Node'; import Feed from './Feed'; import FeedSet from './FeedSet'; -import { timestamp, Maybe, Types, FeedMessage } from '@dotstats/common'; +import { Maybe, Types, FeedMessage } from '@dotstats/common'; const BLOCK_TIME_HISTORY = 10; @@ -69,9 +69,7 @@ export default class Chain { return this.nodes.values(); } - public timeoutCheck() { - const now = timestamp(); - + public timeoutCheck(now: Types.Timestamp) { for (const node of this.nodes.values()) { node.timeoutCheck(now); } diff --git a/packages/backend/src/Node.ts b/packages/backend/src/Node.ts index 13858d8..a301a5d 100644 --- a/packages/backend/src/Node.ts +++ b/packages/backend/src/Node.ts @@ -14,6 +14,8 @@ export interface NodeEvents { emit(event: 'location', location: Location): void; } +function noop() {} + export default class Node { public readonly id: Types.NodeId; public readonly name: Types.NodeName; @@ -40,6 +42,7 @@ export default class Node { private readonly socket: WebSocket; private blockTimes: Array = new Array(BLOCK_TIME_HISTORY); private lastBlockAt: Maybe = null; + private pingStart = 0 as Types.Timestamp; constructor( ip: string, @@ -61,6 +64,8 @@ export default class Node { this.lastMessage = timestamp(); this.socket = socket; + let start = Date.now(); + socket.on('message', (data) => { const message = parseMessage(data); @@ -83,6 +88,11 @@ export default class Node { this.disconnect(); }); + socket.on('pong', () => { + this.latency = (timestamp() - this.pingStart) as Types.Milliseconds; + this.pingStart = 0 as Types.Timestamp; + }); + // Handle cached messages for (const message of messages) { this.onMessage(message); @@ -135,7 +145,7 @@ export default class Node { const timeout = setTimeout(() => { cleanup(); - socket.close(); + socket.terminate(); return reject(new Error('Timeout on waiting for system.connected message')); }, 5000); @@ -145,6 +155,8 @@ export default class Node { public timeoutCheck(now: Types.Timestamp) { if (this.lastMessage + TIMEOUT < now) { this.disconnect(); + } else { + this.updateLatency(now); } } @@ -194,14 +206,13 @@ export default class Node { private disconnect() { this.socket.removeAllListeners(); - this.socket.close(); + this.socket.terminate(); this.events.emit('disconnect'); } private onMessage(message: Message) { this.lastMessage = timestamp(); - this.updateLatency(message.ts); const update = getBestBlock(message); @@ -225,8 +236,15 @@ export default class Node { } } - private updateLatency(time: Date) { - this.latency = (this.lastMessage - +time) as Types.Milliseconds; + private updateLatency(now: Types.Timestamp) { + // if (this.pingStart) { + // console.error(`${this.name} timed out on ping message.`); + // this.disconnect(); + // return; + // } + + this.pingStart = now; + this.socket.ping(noop); } private updateBestBlock(update: BestBlock) {