Move backend to a folder

This commit is contained in:
maciejhirsz
2018-06-27 10:34:03 +02:00
parent e06b684067
commit 462cb8404f
9 changed files with 0 additions and 0 deletions
+77
View File
@@ -0,0 +1,77 @@
import * as EventEmitter from 'events';
import Node from './node';
import Feed, { FeedData } from './feed';
import { Id, IdSet } from './utils';
export default class Aggregator extends EventEmitter {
private nodes: IdSet<Node> = new IdSet<Node>();
private feeds: IdSet<Feed> = new IdSet<Feed>();
public height: number = 0;
constructor() {
super();
setInterval(() => this.timeoutCheck(), 10000);
}
public addNode(node: Node) {
this.nodes.add(node);
this.broadcast(Feed.addedNode(node));
node.once('disconnect', () => {
node.removeAllListeners('block');
this.nodes.remove(node);
this.broadcast(Feed.removedNode(node));
});
node.on('block', () => this.updateBlock(node));
}
public addFeed(feed: Feed) {
this.feeds.add(feed);
feed.send(Feed.bestBlock(this.height));
for (const node of this.nodes.entries) {
feed.send(Feed.addedNode(node));
}
feed.once('disconnect', () => {
this.feeds.remove(feed);
})
}
public nodeList(): IterableIterator<Node> {
return this.nodes.entries;
}
private broadcast(data: FeedData) {
for (const feed of this.feeds.entries) {
feed.send(data);
}
}
private timeoutCheck() {
const now = Date.now();
for (const node of this.nodes.entries) {
node.timeoutCheck(now);
}
}
private updateBlock(node: Node) {
if (node.height > this.height) {
this.height = node.height;
this.broadcast(Feed.bestBlock(this.height));
console.log(`New block ${this.height}`);
}
this.broadcast(Feed.imported(node));
console.log(`${node.name} imported ${node.height}, block time: ${node.blockTime / 1000}s, average: ${node.average / 1000}s | latency ${node.latency}`);
}
}
+99
View File
@@ -0,0 +1,99 @@
import * as WebSocket from 'ws';
import * as EventEmitter from 'events';
import Node, { NodeInfo, BlockInfo } from './node';
import { Opaque, Id, idGenerator } from './utils';
const nextId = idGenerator<Feed>();
export interface BlockInfo {
height: number;
blockTime: number;
}
interface BestBlock {
action: 'best';
payload: number;
}
interface AddedNode {
action: 'added';
payload: [Id<Node>, NodeInfo, BlockInfo];
}
interface RemovedNode {
action: 'removed';
payload: Id<Node>;
}
interface Imported {
action: 'imported';
payload: [Id<Node>, BlockInfo];
}
type Message = BestBlock | AddedNode | RemovedNode | Imported;
/**
* Opaque data type to be sent to the feed. Passing through
* strings means we can only serialize once, no matter how
* many feed clients are listening in.
*/
export type FeedData = Opaque<string, Message>;
function serialize(msg: Message): FeedData {
return JSON.stringify(msg) as FeedData;
}
export default class Feed extends EventEmitter {
public id: Id<Feed>;
private socket: WebSocket;
constructor(socket: WebSocket) {
super();
this.id = nextId();
this.socket = socket;
socket.on('error', () => this.disconnect());
socket.on('close', () => this.disconnect());
}
public static bestBlock(height: number): FeedData {
return serialize({
action: 'best',
payload: height
});
}
public static addedNode(node: Node): FeedData {
return serialize({
action: 'added',
payload: [node.id, node.nodeInfo(), node.blockInfo()]
})
}
public static removedNode(node: Node): FeedData {
return serialize({
action: 'removed',
payload: node.id
});
}
public static imported(node: Node): FeedData {
return serialize({
action: 'imported',
payload: [node.id, node.blockInfo()]
});
}
public send(data: FeedData) {
this.socket.send(data);
}
private disconnect() {
this.socket.removeAllListeners();
this.socket.close();
this.emit('disconnect');
}
}
+48
View File
@@ -0,0 +1,48 @@
import * as WebSocket from 'ws';
import * as express from 'express';
import { createServer } from 'http';
import Node from './node';
import Feed from './feed';
import Aggregator from './aggregator';
import { map, join } from './utils';
const aggregator = new Aggregator;
const app = express();
const server = createServer(app);
// WebSocket for Nodes feeding telemetry data to the server
const incomingTelemetry = new WebSocket.Server({ port: 1024 });
// WebSocket for web clients listening to the telemetry data aggregate
const telemetryFeed = new WebSocket.Server({ server });
app.get('/', function (req, res) {
function nodeInfo(node: Node) {
return `${node.name} | ${node.height} | Block time ${node.blockTime / 1000}s`;
}
res.send(
`<pre>
Best block: ${aggregator.height}
Node list:
${ join(map(aggregator.nodeList(), nodeInfo), '\n') }
</pre>`
);
});
incomingTelemetry.on('connection', async (socket: WebSocket) => {
try {
aggregator.addNode(await Node.fromSocket(socket));
} catch (err) {
console.error(err);
}
});
telemetryFeed.on('connection', (socket: WebSocket) => {
aggregator.addFeed(new Feed(socket));
});
server.listen(8080);
+78
View File
@@ -0,0 +1,78 @@
import { Data } from 'ws';
import { Maybe, Opaque } from './utils';
export function parseMessage(data: Data): Maybe<Message> {
try {
const message = JSON.parse(data.toString());
if (message && typeof message.msg === 'string' && typeof message.ts === 'string') {
message.ts = new Date(message.ts);
return message;
}
} catch (_) {
console.warn('Error parsing message JSON');
}
return null;
}
export function getBestBlock(message: Message): Maybe<BestBlock> {
switch (message.msg) {
case 'node.start':
case 'system.interval':
case 'block.import':
return message;
default:
return null;
}
}
interface MessageBase {
ts: Date,
level: 'INFO' | 'WARN',
}
export interface BestBlock {
best: string,
height: number,
ts: Date,
}
interface SystemConnected {
msg: 'system.connected',
name: string,
chain: string,
config: string,
implementation: string,
version: string,
}
interface SystemInterval extends BestBlock {
msg: 'system.interval',
txcount: number,
peers: number,
status: 'Idle' | string, // TODO: 'Idle' | ...?
}
interface NodeStart extends BestBlock {
msg: 'node.start',
}
interface BlockImport extends BestBlock {
msg: 'block.import',
}
// Union type
export type Message = MessageBase & (
SystemConnected |
SystemInterval |
NodeStart |
BlockImport
);
// received: {"msg":"block.import","level":"INFO","ts":"2018-06-18T17:30:35.285406538+02:00","best":"3d4fdc7960078ddc9be87dddc48324a6d64afdf1f65fffe89529ce9965cd5f29","height":526}
// received: {"msg":"node.start","level":"INFO","ts":"2018-06-18T17:30:40.038731057+02:00","best":"3d4fdc7960078ddc9be87dddc48324a6d64afdf1f65fffe89529ce9965cd5f29","height":526}
// received: {"msg":"system.connected","level":"INFO","ts":"2018-06-18T17:30:40.038975471+02:00","chain":"dev","config":"","version":"0.2.0","implementation":"parity-polkadot","name":"Majestic Widget"}
// received: {"msg":"system.interval","level":"INFO","ts":"2018-06-19T14:00:05.091355364+02:00","txcount":0,"best":"360c9563857308703398f637932b7ffe884e5c7b09692600ff09a4d753c9d948","height":7559,"peers":0,"status":"Idle"}
+179
View File
@@ -0,0 +1,179 @@
import * as WebSocket from 'ws';
import * as EventEmitter from 'events';
import { Maybe, Id, idGenerator } from './utils';
import { parseMessage, getBestBlock, Message, BestBlock } from './message';
const BLOCK_TIME_HISTORY = 10;
const TIMEOUT = 1000 * 60 * 5; // 5 seconds
const nextId = idGenerator<Node>();
export interface NodeInfo {
name: string;
}
export interface BlockInfo {
height: number;
blockTime: number;
}
export default class Node extends EventEmitter {
public lastMessage: number;
public id: Id<Node>;
public name: string;
public implementation: string;
public version: string;
public height: number = 0;
public config: string;
public latency: number = 0;
public blockTime: number = 0;
private socket: WebSocket;
private blockTimes: Array<number> = new Array(BLOCK_TIME_HISTORY);
private lastBlockAt: Maybe<Date> = null;
constructor(socket: WebSocket, name: string, config: string, implentation: string, version: string) {
super();
this.lastMessage = Date.now();
this.id = nextId();
this.socket = socket;
this.name = name;
this.config = config;
this.implementation = implentation;
this.version = version;
console.log(`Listening to a new node: ${name}`);
socket.on('message', (data) => {
const message = parseMessage(data);
if (!message) return;
this.lastMessage = Date.now();
this.updateLatency(message.ts);
const update = getBestBlock(message);
if (update) {
this.updateBestBlock(update);
}
});
socket.on('close', () => {
console.log(`${this.name} has disconnected`);
this.disconnect();
});
socket.on('error', (error) => {
console.error(`${this.name} has errored`, error);
this.disconnect();
});
}
public static fromSocket(socket: WebSocket): Promise<Node> {
return new Promise((resolve, reject) => {
function cleanup() {
clearTimeout(timeout);
socket.removeAllListeners('message');
}
function handler(data: WebSocket.Data) {
const message = parseMessage(data);
if (message && message.msg === "system.connected") {
cleanup();
const { name, config, implementation, version } = message;
resolve(new Node(socket, name, config, implementation, version));
}
}
socket.on('message', handler);
const timeout = setTimeout(() => {
cleanup();
socket.close();
return reject(new Error('Timeout on waiting for system.connected message'));
}, 5000);
});
}
public timeoutCheck(now: number) {
if (this.lastMessage + TIMEOUT < now) {
this.disconnect();
}
}
public nodeInfo(): NodeInfo {
return {
name: this.name,
};
}
public blockInfo(): BlockInfo {
return {
height: this.height,
blockTime: this.blockTime,
};
}
public get average(): number {
let accounted = 0;
let sum = 0;
for (const time of this.blockTimes) {
if (time) {
accounted += 1;
sum += time;
}
}
if (accounted === 0) {
return 0;
}
return sum / accounted;
}
private disconnect() {
this.socket.removeAllListeners();
this.socket.close();
this.emit('disconnect');
}
private updateLatency(time: Date) {
this.latency = this.lastMessage - +time;
}
private updateBestBlock(update: BestBlock) {
const { height, ts: time, best } = update;
if (this.height < height) {
const blockTime = this.getBlockTime(time);
this.height = height;
this.lastBlockAt = time;
this.blockTimes[height % BLOCK_TIME_HISTORY] = blockTime;
this.blockTime = blockTime;
this.emit('block');
}
}
private getBlockTime(time: Date): number {
if (!this.lastBlockAt) {
return 0;
}
return +time - +this.lastBlockAt;
}
}
+109
View File
@@ -0,0 +1,109 @@
/**
* PhantomData akin to Rust, because sometimes you need to be smarter than
* the compiler.
*/
export class PhantomData<P> { private __PHANTOM__: P }
/**
* Opaque type, similar to `opaque type` in Flow, or new types in Rust/C.
* These should be produced only by manually casting `t as Opaque<T, P>`.
*
* `P` can be anything as it's never actually used. Using strings is okay:
*
* ```
* type MyType = Opaque<number, 'MyType'>;
* ```
*/
export type Opaque<T, P> = T & PhantomData<P>;
/**
* Just a readable shorthand for null-ish-able types, akin to `T?` in Flow.
*/
export type Maybe<T> = T | null | undefined;
/**
* Higher order function producing new auto-incremented `Id`s.
*/
export function idGenerator<T>(): () => Id<T> {
let current = 0;
return () => current++ as Id<T>;
}
/**
* Unique type-constrained Id number.
*/
export type Id<T> = Opaque<number, T>;
interface HasId<T> {
id: Id<T>;
}
export class IdSet<T> {
private map: Map<Id<T>, T> = new Map();
public add(item: T & HasId<T>) {
this.map.set(item.id, item);
}
public remove(item: T & HasId<T>) {
this.map.delete(item.id);
}
public get entries(): IterableIterator<T> {
return this.map.values();
}
}
export function* map<T, U>(iter: IterableIterator<T>, fn: (item: T) => U): IterableIterator<U> {
for (const item of iter) yield fn(item);
}
export function* chain<T>(a: IterableIterator<T>, b: IterableIterator<T>): IterableIterator<T> {
yield* a;
yield* b;
}
export function* zip<T, U>(a: IterableIterator<T>, b: IterableIterator<U>): IterableIterator<[T, U]> {
let itemA = a.next();
let itemB = b.next();
while (!itemA.done && !itemB.done) {
yield [itemA.value, itemB.value];
itemA = a.next();
itemB = b.next();
}
}
export function* take<T>(iter: IterableIterator<T>, n: number): IterableIterator<T> {
for (const item of iter) {
if (n-- === 0) return;
yield item;
}
}
export function skip<T>(iter: IterableIterator<T>, n: number): IterableIterator<T> {
while (n-- !== 0 && !iter.next().done) {}
return iter;
}
export function reduce<T, R>(iter: IterableIterator<T>, fn: (accu: R, item: T) => R, accumulator: R): R {
for (const item of iter) accumulator = fn(accumulator, item);
return accumulator;
}
export function join(iter: IterableIterator<{ toString: () => string }>, glue: string): string {
const first = iter.next();
if (first.done) return '';
let result = first.value.toString();
for (const item of iter) result += glue + item;
return result;
}