Added ago timers

This commit is contained in:
maciejhirsz
2018-07-05 17:04:00 +02:00
parent 81ef3ee14e
commit 01da7dfc47
19 changed files with 200 additions and 48 deletions
+8 -4
View File
@@ -1,7 +1,7 @@
import * as EventEmitter from 'events';
import Node from './node';
import Feed from './feed';
import { Types, IdSet, FeedMessage } from '@dotstats/common';
import { timestamp, Types, IdSet, FeedMessage } from '@dotstats/common';
export default class Aggregator extends EventEmitter {
private nodes = new IdSet<Types.NodeId, Node>();
@@ -9,6 +9,7 @@ export default class Aggregator extends EventEmitter {
private messages: Array<FeedMessage.Message> = [];
public height = 0 as Types.BlockNumber;
public blockTimestamp = 0 as Types.Timestamp;
constructor() {
super();
@@ -34,7 +35,7 @@ export default class Aggregator extends EventEmitter {
public addFeed(feed: Feed) {
this.feeds.add(feed);
const messages = [Feed.bestBlock(this.height)];
const messages = [Feed.timeSync(), Feed.bestBlock(this.height, this.blockTimestamp)];
for (const node of this.nodes.values()) {
messages.push(Feed.addedNode(node));
@@ -69,18 +70,21 @@ export default class Aggregator extends EventEmitter {
}
private timeoutCheck() {
const now = Date.now();
const now = timestamp();
for (const node of this.nodes.values()) {
node.timeoutCheck(now);
}
this.broadcast(Feed.timeSync());
}
private updateBlock(node: Node) {
if (node.height > this.height) {
this.height = node.height;
this.blockTimestamp = node.blockTimestamp;
this.broadcast(Feed.bestBlock(this.height));
this.broadcast(Feed.bestBlock(this.height, this.blockTimestamp));
console.log(`New block ${this.height}`);
}
+10 -3
View File
@@ -1,7 +1,7 @@
import * as WebSocket from 'ws';
import * as EventEmitter from 'events';
import Node from './node';
import { Opaque, FeedMessage, Types, idGenerator } from '@dotstats/common';
import { timestamp, Opaque, FeedMessage, Types, idGenerator } from '@dotstats/common';
const nextId = idGenerator<Types.FeedId>();
const { Actions } = FeedMessage;
@@ -21,10 +21,10 @@ export default class Feed extends EventEmitter {
socket.on('close', () => this.disconnect());
}
public static bestBlock(height: Types.BlockNumber): FeedMessage.Message {
public static bestBlock(height: Types.BlockNumber, ts: Types.Timestamp): FeedMessage.Message {
return {
action: Actions.BestBlock,
payload: height
payload: [height, ts]
};
}
@@ -56,6 +56,13 @@ export default class Feed extends EventEmitter {
};
}
public static timeSync(): FeedMessage.Message {
return {
action: Actions.TimeSync,
payload: timestamp()
};
}
public sendData(data: FeedMessage.Data) {
this.socket.send(data);
}
+3 -10
View File
@@ -1,23 +1,18 @@
import * as WebSocket from 'ws';
import * as express from 'express';
import { createServer } from 'http';
import Node from './node';
import Feed from './feed';
import Aggregator from './aggregator';
const aggregator = new Aggregator;
const app = express();
const server = createServer(app);
// WebSocket for Nodes feeding telemetry data to the server
const incomingTelemetry = new WebSocket.Server({ port: 1024 });
// WebSocket for web clients listening to the telemetry data aggregate
const telemetryFeed = new WebSocket.Server({ server });
const telemetryFeed = new WebSocket.Server({ port: 8080 });
app.get('/', function (req, res) {
res.send('See live listing at <a href="http://telemetry.polkadot.io/">telemetry.polkadot.io/<a>');
});
console.log('Telemetry server listening on port 1024');
console.log('Feed server listening on port 8080');
incomingTelemetry.on('connection', async (socket: WebSocket) => {
try {
@@ -31,5 +26,3 @@ telemetryFeed.on('connection', (socket: WebSocket) => {
aggregator.addFeed(new Feed(socket));
});
console.log('Starting server on port 8080');
server.listen(8080);
+17 -7
View File
@@ -1,15 +1,15 @@
import * as WebSocket from 'ws';
import * as EventEmitter from 'events';
import { Maybe, Types, idGenerator } from '@dotstats/common';
import { timestamp, Maybe, Types, idGenerator } from '@dotstats/common';
import { parseMessage, getBestBlock, Message, BestBlock, SystemInterval } from './message';
const BLOCK_TIME_HISTORY = 10;
const TIMEOUT = 1000 * 60 * 1; // 1 minute
const TIMEOUT = (1000 * 60 * 1) as Types.Milliseconds; // 1 minute
const nextId = idGenerator<Types.NodeId>();
export default class Node extends EventEmitter {
public lastMessage: number;
public lastMessage: Types.Timestamp;
public id: Types.NodeId;
public name: Types.NodeName;
public implementation: Types.NodeImplementation;
@@ -19,6 +19,7 @@ export default class Node extends EventEmitter {
public height = 0 as Types.BlockNumber;
public latency = 0 as Types.Milliseconds;
public blockTime = 0 as Types.Milliseconds;
public blockTimestamp = 0 as Types.Timestamp;
private peers = 0 as Types.PeerCount;
private txcount = 0 as Types.TransactionCount;
@@ -36,7 +37,7 @@ export default class Node extends EventEmitter {
) {
super();
this.lastMessage = Date.now();
this.lastMessage = timestamp();
this.id = nextId();
this.socket = socket;
this.name = name;
@@ -53,7 +54,7 @@ export default class Node extends EventEmitter {
return;
}
this.lastMessage = Date.now();
this.lastMessage = timestamp();
this.updateLatency(message.ts);
const update = getBestBlock(message);
@@ -111,7 +112,7 @@ export default class Node extends EventEmitter {
});
}
public timeoutCheck(now: number) {
public timeoutCheck(now: Types.Timestamp) {
if (this.lastMessage + TIMEOUT < now) {
this.disconnect();
}
@@ -126,7 +127,7 @@ export default class Node extends EventEmitter {
}
public blockDetails(): Types.BlockDetails {
return [this.height, this.best, this.blockTime];
return [this.height, this.best, this.blockTime, this.blockTimestamp];
}
public get average(): number {
@@ -147,6 +148,14 @@ export default class Node extends EventEmitter {
return sum / accounted;
}
public get localBlockAt(): Types.Milliseconds {
if (!this.lastBlockAt) {
return 0 as Types.Milliseconds;
}
return +(this.lastBlockAt || 0) as Types.Milliseconds;
}
private disconnect() {
this.socket.removeAllListeners();
this.socket.close();
@@ -177,6 +186,7 @@ export default class Node extends EventEmitter {
this.best = best;
this.height = height;
this.blockTimestamp = timestamp();
this.lastBlockAt = time;
this.blockTimes[height % BLOCK_TIME_HISTORY] = blockTime;
this.blockTime = blockTime;