Frontend stuff

This commit is contained in:
maciejhirsz
2018-07-02 16:07:16 +02:00
parent fe2419e54b
commit 1eea35b7ce
204 changed files with 659 additions and 227 deletions
+21 -7
View File
@@ -1,11 +1,12 @@
import * as EventEmitter from 'events';
import Node from './node';
import Feed, { FeedData } from './feed';
import { Types, IdSet } from '@dotstats/common';
import Feed from './feed';
import { Types, IdSet, FeedMessage } from '@dotstats/common';
export default class Aggregator extends EventEmitter {
private nodes = new IdSet<Types.NodeId, Node>();
private feeds = new IdSet<Types.FeedId, Feed>();
private messages: Array<FeedMessage.Message> = [];
public height = 0 as Types.BlockNumber;
@@ -33,12 +34,14 @@ export default class Aggregator extends EventEmitter {
public addFeed(feed: Feed) {
this.feeds.add(feed);
feed.send(Feed.bestBlock(this.height));
const messages = [Feed.bestBlock(this.height)];
for (const node of this.nodes.values()) {
feed.send(Feed.addedNode(node));
messages.push(Feed.addedNode(node));
}
feed.sendMessages(messages);
feed.once('disconnect', () => {
this.feeds.remove(feed);
});
@@ -48,9 +51,20 @@ export default class Aggregator extends EventEmitter {
return this.nodes.values();
}
private broadcast(data: FeedData) {
for (const feed of this.feeds.values()) {
feed.send(data);
private broadcast(message: FeedMessage.Message) {
const queue = this.messages.length === 0;
this.messages.push(message);
if (queue) {
process.nextTick(() => {
const data = FeedMessage.serialize(this.messages);
this.messages = [];
for (const feed of this.feeds.values()) {
feed.sendData(data);
}
});
}
}
+27 -33
View File
@@ -1,20 +1,10 @@
import * as WebSocket from 'ws';
import * as EventEmitter from 'events';
import Node from './node';
import { Opaque, Types, idGenerator } from '@dotstats/common';
import { Opaque, FeedMessage, Types, idGenerator } from '@dotstats/common';
const nextId = idGenerator<Types.FeedId>();
/**
* Opaque data type to be sent to the feed. Passing through
* strings means we can only serialize once, no matter how
* many feed clients are listening in.
*/
export type FeedData = Opaque<string, Types.FeedMessage>;
function serialize(msg: Types.FeedMessage): FeedData {
return JSON.stringify(msg) as FeedData;
}
const { Actions } = FeedMessage;
export default class Feed extends EventEmitter {
public id: Types.FeedId;
@@ -31,45 +21,49 @@ export default class Feed extends EventEmitter {
socket.on('close', () => this.disconnect());
}
public static bestBlock(height: Types.BlockNumber): FeedData {
return serialize({
action: 'best',
public static bestBlock(height: Types.BlockNumber): FeedMessage.Message {
return {
action: Actions.BestBlock,
payload: height
});
};
}
public static addedNode(node: Node): FeedData {
return serialize({
action: 'added',
public static addedNode(node: Node): FeedMessage.Message {
return {
action: Actions.AddedNode,
payload: [node.id, node.nodeDetails(), node.nodeStats(), node.blockDetails()]
})
};
}
public static removedNode(node: Node): FeedData {
return serialize({
action: 'removed',
public static removedNode(node: Node): FeedMessage.Message {
return {
action: Actions.RemovedNode,
payload: node.id
});
};
}
public static imported(node: Node): FeedData {
return serialize({
action: 'imported',
public static imported(node: Node): FeedMessage.Message {
return {
action: Actions.ImportedBlock,
payload: [node.id, node.blockDetails()]
});
};
}
public static stats(node: Node): FeedData {
return serialize({
action: 'stats',
public static stats(node: Node): FeedMessage.Message {
return {
action: Actions.NodeStats,
payload: [node.id, node.nodeStats()]
});
};
}
public send(data: FeedData) {
public sendData(data: FeedMessage.Data) {
this.socket.send(data);
}
public sendMessages(messages: Array<FeedMessage.Message>) {
this.socket.send(FeedMessage.serialize(messages))
}
private disconnect() {
this.socket.removeAllListeners();
this.socket.close();