Send ping to the node

This commit is contained in:
maciejhirsz
2018-07-19 15:23:10 +02:00
parent 4bdbeb311d
commit 280e0f5c9c
3 changed files with 29 additions and 11 deletions
+4 -2
View File
@@ -2,7 +2,7 @@ import Chain from './Chain';
import Node from './Node'; import Node from './Node';
import Feed from './Feed'; import Feed from './Feed';
import FeedSet from './FeedSet'; import FeedSet from './FeedSet';
import { Types, FeedMessage } from '@dotstats/common'; import { Types, FeedMessage, timestamp } from '@dotstats/common';
export default class Aggregator { export default class Aggregator {
private readonly chains = new Map<Types.ChainLabel, Chain>(); private readonly chains = new Map<Types.ChainLabel, Chain>();
@@ -83,8 +83,10 @@ export default class Aggregator {
private timeoutCheck() { private timeoutCheck() {
const empty: Types.ChainLabel[] = []; const empty: Types.ChainLabel[] = [];
const now = timestamp();
for (const chain of this.chains.values()) { for (const chain of this.chains.values()) {
chain.timeoutCheck(); chain.timeoutCheck(now);
} }
} }
} }
+2 -4
View File
@@ -2,7 +2,7 @@ import * as EventEmitter from 'events';
import Node from './Node'; import Node from './Node';
import Feed from './Feed'; import Feed from './Feed';
import FeedSet from './FeedSet'; import FeedSet from './FeedSet';
import { timestamp, Maybe, Types, FeedMessage } from '@dotstats/common'; import { Maybe, Types, FeedMessage } from '@dotstats/common';
const BLOCK_TIME_HISTORY = 10; const BLOCK_TIME_HISTORY = 10;
@@ -69,9 +69,7 @@ export default class Chain {
return this.nodes.values(); return this.nodes.values();
} }
public timeoutCheck() { public timeoutCheck(now: Types.Timestamp) {
const now = timestamp();
for (const node of this.nodes.values()) { for (const node of this.nodes.values()) {
node.timeoutCheck(now); node.timeoutCheck(now);
} }
+23 -5
View File
@@ -14,6 +14,8 @@ export interface NodeEvents {
emit(event: 'location', location: Location): void; emit(event: 'location', location: Location): void;
} }
function noop() {}
export default class Node { export default class Node {
public readonly id: Types.NodeId; public readonly id: Types.NodeId;
public readonly name: Types.NodeName; public readonly name: Types.NodeName;
@@ -40,6 +42,7 @@ export default class Node {
private readonly socket: WebSocket; private readonly socket: WebSocket;
private blockTimes: Array<number> = new Array(BLOCK_TIME_HISTORY); private blockTimes: Array<number> = new Array(BLOCK_TIME_HISTORY);
private lastBlockAt: Maybe<Date> = null; private lastBlockAt: Maybe<Date> = null;
private pingStart = 0 as Types.Timestamp;
constructor( constructor(
ip: string, ip: string,
@@ -61,6 +64,8 @@ export default class Node {
this.lastMessage = timestamp(); this.lastMessage = timestamp();
this.socket = socket; this.socket = socket;
let start = Date.now();
socket.on('message', (data) => { socket.on('message', (data) => {
const message = parseMessage(data); const message = parseMessage(data);
@@ -83,6 +88,11 @@ export default class Node {
this.disconnect(); this.disconnect();
}); });
socket.on('pong', () => {
this.latency = (timestamp() - this.pingStart) as Types.Milliseconds;
this.pingStart = 0 as Types.Timestamp;
});
// Handle cached messages // Handle cached messages
for (const message of messages) { for (const message of messages) {
this.onMessage(message); this.onMessage(message);
@@ -135,7 +145,7 @@ export default class Node {
const timeout = setTimeout(() => { const timeout = setTimeout(() => {
cleanup(); cleanup();
socket.close(); socket.terminate();
return reject(new Error('Timeout on waiting for system.connected message')); return reject(new Error('Timeout on waiting for system.connected message'));
}, 5000); }, 5000);
@@ -145,6 +155,8 @@ export default class Node {
public timeoutCheck(now: Types.Timestamp) { public timeoutCheck(now: Types.Timestamp) {
if (this.lastMessage + TIMEOUT < now) { if (this.lastMessage + TIMEOUT < now) {
this.disconnect(); this.disconnect();
} else {
this.updateLatency(now);
} }
} }
@@ -194,14 +206,13 @@ export default class Node {
private disconnect() { private disconnect() {
this.socket.removeAllListeners(); this.socket.removeAllListeners();
this.socket.close(); this.socket.terminate();
this.events.emit('disconnect'); this.events.emit('disconnect');
} }
private onMessage(message: Message) { private onMessage(message: Message) {
this.lastMessage = timestamp(); this.lastMessage = timestamp();
this.updateLatency(message.ts);
const update = getBestBlock(message); const update = getBestBlock(message);
@@ -225,8 +236,15 @@ export default class Node {
} }
} }
private updateLatency(time: Date) { private updateLatency(now: Types.Timestamp) {
this.latency = (this.lastMessage - +time) as Types.Milliseconds; // if (this.pingStart) {
// console.error(`${this.name} timed out on ping message.`);
// this.disconnect();
// return;
// }
this.pingStart = now;
this.socket.ping(noop);
} }
private updateBestBlock(update: BestBlock) { private updateBestBlock(update: BestBlock) {