diff --git a/packages/backend/src/Aggregator.ts b/packages/backend/src/Aggregator.ts new file mode 100644 index 0000000..00c4e4d --- /dev/null +++ b/packages/backend/src/Aggregator.ts @@ -0,0 +1,84 @@ +import Chain from './Chain'; +import Node from './Node'; +import Feed from './Feed'; +import FeedSet from './FeedSet'; +import { Types, FeedMessage } from '@dotstats/common'; + +export default class Aggregator { + private readonly chains = new Map(); + private readonly feeds = new FeedSet(); + + constructor() { + setInterval(() => this.timeoutCheck(), 10000); + } + + public addNode(node: Node) { + let chain = this.getChain(node.chain); + + chain.addNode(node); + } + + public addFeed(feed: Feed) { + this.feeds.add(feed); + + for (const chain of this.chains.values()) { + feed.sendMessage(Feed.addedChain(chain.label)); + } + + feed.events.on('subscribe', (label: Types.ChainLabel) => { + const chain = this.chains.get(label); + + if (chain) { + chain.addFeed(feed); + feed.sendMessage(Feed.subscribedTo(label)); + } + }) + + feed.events.on('unsubscribe', (label: Types.ChainLabel) => { + const chain = this.chains.get(label); + + if (chain) { + chain.removeFeed(feed); + feed.sendMessage(Feed.unsubscribedFrom(label)); + } + }); + } + + private getChain(label: Types.ChainLabel): Chain { + const chain = this.chains.get(label); + + if (chain) { + return chain; + } else { + const chain = new Chain(label); + + chain.events.on('disconnect', (count: number) => { + if (count !== 0) { + return; + } + + chain.events.removeAllListeners(); + + this.chains.delete(chain.label); + + console.log(`Chain: ${label} lost all nodes`); + this.feeds.broadcast(Feed.removedChain(label)); + }); + + this.chains.set(label, chain); + + console.log(`New chain: ${label}`); + this.feeds.broadcast(Feed.addedChain(label)); + + return chain; + } + } + + private timeoutCheck() { + const empty: Types.ChainLabel[] = []; + + for (const chain of this.chains.values()) { + chain.timeoutCheck(); + } + } +} diff --git a/packages/backend/src/Chain.ts b/packages/backend/src/Chain.ts new file mode 100644 index 0000000..460253d --- /dev/null +++ b/packages/backend/src/Chain.ts @@ -0,0 +1,90 @@ +import * as EventEmitter from 'events'; +import Node from './Node'; +import Feed from './Feed'; +import FeedSet from './FeedSet'; +import { timestamp, Types, FeedMessage } from '@dotstats/common'; + +export default class Chain { + private nodes = new Set(); + private feeds = new FeedSet(); + + public readonly events = new EventEmitter(); + public readonly label: Types.ChainLabel; + + public height = 0 as Types.BlockNumber; + public blockTimestamp = 0 as Types.Timestamp; + + constructor(label: Types.ChainLabel) { + this.label = label; + } + + public get nodeCount(): number { + return this.nodes.size; + } + + public addNode(node: Node) { + console.log(`[${this.label}] new node: ${node.name}`); + + this.nodes.add(node); + this.feeds.broadcast(Feed.addedNode(node)); + + node.events.once('disconnect', () => { + node.events.removeAllListeners(); + + this.nodes.delete(node); + this.feeds.broadcast(Feed.removedNode(node)); + + this.events.emit('disconnect', this.nodeCount); + }); + + node.events.on('block', () => this.updateBlock(node)); + node.events.on('stats', () => this.feeds.broadcast(Feed.stats(node))); + } + + public addFeed(feed: Feed) { + this.feeds.add(feed); + + // TODO: this is a bit unclean, find a better way + feed.chain = this.label; + + feed.sendMessage(Feed.timeSync()); + feed.sendMessage(Feed.bestBlock(this.height, this.blockTimestamp)); + + for (const node of this.nodes.values()) { + feed.sendMessage(Feed.addedNode(node)); + } + } + + public removeFeed(feed: Feed) { + this.feeds.remove(feed); + } + + public nodeList(): IterableIterator { + return this.nodes.values(); + } + + public timeoutCheck() { + const now = timestamp(); + + for (const node of this.nodes.values()) { + node.timeoutCheck(now); + } + + this.feeds.broadcast(Feed.timeSync()); + } + + private updateBlock(node: Node) { + if (node.height > this.height) { + this.height = node.height; + this.blockTimestamp = node.blockTimestamp; + + this.feeds.broadcast(Feed.bestBlock(this.height, this.blockTimestamp)); + + console.log(`[${this.label}] New block ${this.height}`); + } + + this.feeds.broadcast(Feed.imported(node)); + + console.log(`[${this.label}] ${node.name} imported ${node.height}, block time: ${node.blockTime / 1000}s, average: ${node.average / 1000}s | latency ${node.latency}`); + } +} diff --git a/packages/backend/src/Feed.ts b/packages/backend/src/Feed.ts new file mode 100644 index 0000000..e55b38d --- /dev/null +++ b/packages/backend/src/Feed.ts @@ -0,0 +1,136 @@ +import * as WebSocket from 'ws'; +import * as EventEmitter from 'events'; +import Node from './Node'; +import { timestamp, Maybe, FeedMessage, Types, idGenerator } from '@dotstats/common'; + +const nextId = idGenerator(); +const { Actions } = FeedMessage; + +export default class Feed { + public id: Types.FeedId; + + public chain: Maybe = null; + public readonly events = new EventEmitter(); + + private socket: WebSocket; + private messages: Array = []; + + constructor(socket: WebSocket) { + this.id = nextId(); + this.socket = socket; + + socket.on('message', (data) => this.handleCommand(data.toString())); + socket.on('error', () => this.disconnect()); + socket.on('close', () => this.disconnect()); + } + + public static bestBlock(height: Types.BlockNumber, ts: Types.Timestamp): FeedMessage.Message { + return { + action: Actions.BestBlock, + payload: [height, ts] + }; + } + + public static addedNode(node: Node): FeedMessage.Message { + return { + action: Actions.AddedNode, + payload: [node.id, node.nodeDetails(), node.nodeStats(), node.blockDetails()] + }; + } + + public static removedNode(node: Node): FeedMessage.Message { + return { + action: Actions.RemovedNode, + payload: node.id + }; + } + + public static imported(node: Node): FeedMessage.Message { + return { + action: Actions.ImportedBlock, + payload: [node.id, node.blockDetails()] + }; + } + + public static stats(node: Node): FeedMessage.Message { + return { + action: Actions.NodeStats, + payload: [node.id, node.nodeStats()] + }; + } + + public static timeSync(): FeedMessage.Message { + return { + action: Actions.TimeSync, + payload: timestamp() + }; + } + + public static addedChain(label: Types.ChainLabel): FeedMessage.Message { + return { + action: Actions.AddedChain, + payload: label + }; + } + + public static removedChain(label: Types.ChainLabel): FeedMessage.Message { + return { + action: Actions.RemovedChain, + payload: label + } + } + + public static subscribedTo(label: Types.ChainLabel): FeedMessage.Message { + return { + action: Actions.SubscribedTo, + payload: label, + } + } + + public static unsubscribedFrom(label: Types.ChainLabel): FeedMessage.Message { + return { + action: Actions.UnsubscribedFrom, + payload: label, + } + } + + public sendData(data: FeedMessage.Data) { + this.socket.send(data); + } + + public sendMessage(message: FeedMessage.Message) { + const queue = this.messages.length === 0; + + this.messages.push(message); + + if (queue) { + process.nextTick(this.sendMessages); + } + } + + private sendMessages = () => { + const data = FeedMessage.serialize(this.messages); + this.messages = []; + this.socket.send(data); + } + + private handleCommand(cmd: string) { + if (cmd.startsWith('subscribe:')) { + if (this.chain) { + this.events.emit('unsubscribe', this.chain); + this.chain = null; + } + + const label = cmd.substr(10) as Types.ChainLabel; + + this.events.emit('subscribe', label); + } + } + + private disconnect() { + this.socket.removeAllListeners(); + this.socket.close(); + + this.events.emit('disconnect'); + } +} diff --git a/packages/backend/src/FeedSet.ts b/packages/backend/src/FeedSet.ts new file mode 100644 index 0000000..8c48348 --- /dev/null +++ b/packages/backend/src/FeedSet.ts @@ -0,0 +1,55 @@ +import Feed from './Feed'; +import { FeedMessage } from '@dotstats/common'; + +type DisconnectListener = () => void; + +export default class FeedSet { + private feeds = new Map(); + private messages: Array = []; + + public values(): IterableIterator { + return this.feeds.keys(); + } + + public each(fn: (feed: Feed) => void) { + for (const feed of this.values()) { + fn(feed); + } + } + + public add(feed: Feed) { + const listener = () => this.remove(feed); + + this.feeds.set(feed, listener); + + feed.events.once('disconnect', listener); + } + + public remove(feed: Feed) { + const listener = this.feeds.get(feed); + + if (!listener) { + return; + } + + feed.events.removeListener('disconnect', listener); + + this.feeds.delete(feed); + } + + public broadcast(message: FeedMessage.Message) { + const queue = this.messages.length === 0; + + this.messages.push(message); + + if (queue) { + process.nextTick(this.sendMessages); + } + } + + private sendMessages = () => { + const data = FeedMessage.serialize(this.messages); + this.messages = []; + this.each(feed => feed.sendData(data)); + } +} diff --git a/packages/backend/src/node.ts b/packages/backend/src/Node.ts similarity index 87% rename from packages/backend/src/node.ts rename to packages/backend/src/Node.ts index 053a337..0df1c5d 100644 --- a/packages/backend/src/node.ts +++ b/packages/backend/src/Node.ts @@ -8,12 +8,16 @@ const TIMEOUT = (1000 * 60 * 1) as Types.Milliseconds; // 1 minute const nextId = idGenerator(); -export default class Node extends EventEmitter { +export default class Node { + public readonly id: Types.NodeId; + public readonly name: Types.NodeName; + public readonly chain: Types.ChainLabel; + public readonly implementation: Types.NodeImplementation; + public readonly version: Types.NodeVersion; + + public readonly events = new EventEmitter(); + public lastMessage: Types.Timestamp; - public id: Types.NodeId; - public name: Types.NodeName; - public implementation: Types.NodeImplementation; - public version: Types.NodeVersion; public config: string; public best = '' as Types.BlockHash; public height = 0 as Types.BlockNumber; @@ -24,28 +28,26 @@ export default class Node extends EventEmitter { private peers = 0 as Types.PeerCount; private txcount = 0 as Types.TransactionCount; - private socket: WebSocket; + private readonly socket: WebSocket; private blockTimes: Array = new Array(BLOCK_TIME_HISTORY); private lastBlockAt: Maybe = null; constructor( socket: WebSocket, name: Types.NodeName, + chain: Types.ChainLabel, config: string, implentation: Types.NodeImplementation, version: Types.NodeVersion, ) { - super(); - - this.lastMessage = timestamp(); this.id = nextId(); - this.socket = socket; this.name = name; + this.chain = chain; this.config = config; this.implementation = implentation; this.version = version; - - console.log(`Listening to a new node: ${name}`); + this.lastMessage = timestamp(); + this.socket = socket; socket.on('message', (data) => { const message = parseMessage(data); @@ -94,9 +96,9 @@ export default class Node extends EventEmitter { if (message && message.msg === "system.connected") { cleanup(); - const { name, config, implementation, version } = message; + const { name, chain, config, implementation, version } = message; - resolve(new Node(socket, name, config, implementation, version)); + resolve(new Node(socket, name, chain, config, implementation, version)); } } @@ -160,7 +162,7 @@ export default class Node extends EventEmitter { this.socket.removeAllListeners(); this.socket.close(); - this.emit('disconnect'); + this.events.emit('disconnect'); } private onSystemInterval(message: SystemInterval) { @@ -170,7 +172,7 @@ export default class Node extends EventEmitter { this.peers = peers; this.txcount = txcount; - this.emit('stats'); + this.events.emit('stats'); } } @@ -191,7 +193,7 @@ export default class Node extends EventEmitter { this.blockTimes[height % BLOCK_TIME_HISTORY] = blockTime; this.blockTime = blockTime; - this.emit('block'); + this.events.emit('block'); } } diff --git a/packages/backend/src/aggregator.ts b/packages/backend/src/aggregator.ts deleted file mode 100644 index 4e2af6e..0000000 --- a/packages/backend/src/aggregator.ts +++ /dev/null @@ -1,96 +0,0 @@ -import * as EventEmitter from 'events'; -import Node from './node'; -import Feed from './feed'; -import { timestamp, Types, FeedMessage } from '@dotstats/common'; - -export default class Aggregator extends EventEmitter { - private nodes = new Set(); - private feeds = new Set(); - private messages: Array = []; - - public height = 0 as Types.BlockNumber; - public blockTimestamp = 0 as Types.Timestamp; - - constructor() { - super(); - - setInterval(() => this.timeoutCheck(), 10000); - } - - public addNode(node: Node) { - this.nodes.add(node); - this.broadcast(Feed.addedNode(node)); - - node.once('disconnect', () => { - node.removeAllListeners(); - - this.nodes.delete(node); - this.broadcast(Feed.removedNode(node)); - }); - - node.on('block', () => this.updateBlock(node)); - node.on('stats', () => this.broadcast(Feed.stats(node))); - } - - public addFeed(feed: Feed) { - this.feeds.add(feed); - - const messages = [Feed.timeSync(), Feed.bestBlock(this.height, this.blockTimestamp)]; - - for (const node of this.nodes.values()) { - messages.push(Feed.addedNode(node)); - } - - feed.sendMessages(messages); - - feed.once('disconnect', () => { - this.feeds.delete(feed); - }); - } - - public nodeList(): IterableIterator { - return this.nodes.values(); - } - - private broadcast(message: FeedMessage.Message) { - const queue = this.messages.length === 0; - - this.messages.push(message); - - if (queue) { - process.nextTick(() => { - const data = FeedMessage.serialize(this.messages); - this.messages = []; - - for (const feed of this.feeds.values()) { - feed.sendData(data); - } - }); - } - } - - private timeoutCheck() { - const now = timestamp(); - - for (const node of this.nodes.values()) { - node.timeoutCheck(now); - } - - this.broadcast(Feed.timeSync()); - } - - private updateBlock(node: Node) { - if (node.height > this.height) { - this.height = node.height; - this.blockTimestamp = node.blockTimestamp; - - this.broadcast(Feed.bestBlock(this.height, this.blockTimestamp)); - - console.log(`New block ${this.height}`); - } - - this.broadcast(Feed.imported(node)); - - console.log(`${node.name} imported ${node.height}, block time: ${node.blockTime / 1000}s, average: ${node.average / 1000}s | latency ${node.latency}`); - } -} diff --git a/packages/backend/src/feed.ts b/packages/backend/src/feed.ts deleted file mode 100644 index a56bc37..0000000 --- a/packages/backend/src/feed.ts +++ /dev/null @@ -1,80 +0,0 @@ -import * as WebSocket from 'ws'; -import * as EventEmitter from 'events'; -import Node from './node'; -import { timestamp, Opaque, FeedMessage, Types, idGenerator } from '@dotstats/common'; - -const nextId = idGenerator(); -const { Actions } = FeedMessage; - -export default class Feed extends EventEmitter { - public id: Types.FeedId; - - private socket: WebSocket; - - constructor(socket: WebSocket) { - super(); - - this.id = nextId(); - this.socket = socket; - - socket.on('error', () => this.disconnect()); - socket.on('close', () => this.disconnect()); - } - - public static bestBlock(height: Types.BlockNumber, ts: Types.Timestamp): FeedMessage.Message { - return { - action: Actions.BestBlock, - payload: [height, ts] - }; - } - - public static addedNode(node: Node): FeedMessage.Message { - return { - action: Actions.AddedNode, - payload: [node.id, node.nodeDetails(), node.nodeStats(), node.blockDetails()] - }; - } - - public static removedNode(node: Node): FeedMessage.Message { - return { - action: Actions.RemovedNode, - payload: node.id - }; - } - - public static imported(node: Node): FeedMessage.Message { - return { - action: Actions.ImportedBlock, - payload: [node.id, node.blockDetails()] - }; - } - - public static stats(node: Node): FeedMessage.Message { - return { - action: Actions.NodeStats, - payload: [node.id, node.nodeStats()] - }; - } - - public static timeSync(): FeedMessage.Message { - return { - action: Actions.TimeSync, - payload: timestamp() - }; - } - - public sendData(data: FeedMessage.Data) { - this.socket.send(data); - } - - public sendMessages(messages: Array) { - this.socket.send(FeedMessage.serialize(messages)) - } - - private disconnect() { - this.socket.removeAllListeners(); - this.socket.close(); - - this.emit('disconnect'); - } -} diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 294c481..4496294 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -1,9 +1,9 @@ import * as WebSocket from 'ws'; -import Node from './node'; -import Feed from './feed'; -import Aggregator from './aggregator'; +import Node from './Node'; +import Feed from './Feed'; +import Aggregator from './Aggregator'; -const aggregator = new Aggregator; +const aggregator = new Aggregator(); // WebSocket for Nodes feeding telemetry data to the server const incomingTelemetry = new WebSocket.Server({ port: 1024 }); @@ -16,7 +16,9 @@ console.log('Feed server listening on port 8080'); incomingTelemetry.on('connection', async (socket: WebSocket) => { try { - aggregator.addNode(await Node.fromSocket(socket)); + const node = await Node.fromSocket(socket); + + aggregator.addNode(node); } catch (err) { console.error(err); } diff --git a/packages/backend/src/message.ts b/packages/backend/src/message.ts index cff5500..7a24172 100644 --- a/packages/backend/src/message.ts +++ b/packages/backend/src/message.ts @@ -42,7 +42,7 @@ export interface BestBlock { interface SystemConnected { msg: 'system.connected', name: Types.NodeName, - chain: string, + chain: Types.ChainLabel, config: string, implementation: Types.NodeImplementation, version: Types.NodeVersion, diff --git a/packages/common/src/feed.ts b/packages/common/src/feed.ts index 31570fa..ff162b9 100644 --- a/packages/common/src/feed.ts +++ b/packages/common/src/feed.ts @@ -1,5 +1,5 @@ import { Opaque } from './helpers'; -import { NodeId, NodeDetails, NodeStats, BlockNumber, BlockDetails, Timestamp } from './types'; +import { NodeId, NodeDetails, NodeStats, BlockNumber, BlockDetails, Timestamp, ChainLabel } from './types'; export const Actions = { BestBlock: 0 as 0, @@ -8,6 +8,10 @@ export const Actions = { ImportedBlock: 3 as 3, NodeStats: 4 as 4, TimeSync: 5 as 5, + AddedChain: 6 as 6, + RemovedChain: 7 as 7, + SubscribedTo: 8 as 8, + UnsubscribedFrom: 9 as 9 }; export type Action = typeof Actions[keyof typeof Actions]; @@ -47,6 +51,26 @@ export namespace Variants { action: typeof Actions.TimeSync; payload: Timestamp; } + + export interface AddedChainMessage extends MessageBase { + action: typeof Actions.AddedChain; + payload: ChainLabel; + } + + export interface RemovedChainMessage extends MessageBase { + action: typeof Actions.RemovedChain; + payload: ChainLabel; + } + + export interface SubscribedToMessage extends MessageBase { + action: typeof Actions.SubscribedTo; + payload: ChainLabel; + } + + export interface UnsubscribedFromMessage extends MessageBase { + action: typeof Actions.UnsubscribedFrom; + payload: ChainLabel; + } } export type Message = @@ -55,7 +79,11 @@ export type Message = | Variants.RemovedNodeMessage | Variants.ImportedBlockMessage | Variants.NodeStatsMessage - | Variants.TimeSyncMessage; + | Variants.TimeSyncMessage + | Variants.AddedChainMessage + | Variants.RemovedChainMessage + | Variants.SubscribedToMessage + | Variants.UnsubscribedFromMessage; /** * Opaque data type to be sent to the feed. Passing through diff --git a/packages/common/src/types.ts b/packages/common/src/types.ts index 855b25a..61f3cbe 100644 --- a/packages/common/src/types.ts +++ b/packages/common/src/types.ts @@ -1,6 +1,7 @@ import { Opaque } from './helpers'; import { Id } from './id'; +export type ChainLabel = Opaque; export type FeedId = Id<'Feed'>; export type NodeId = Id<'Node'>; export type NodeName = Opaque; diff --git a/packages/frontend/src/App.css b/packages/frontend/src/App.css index 9a00e3f..2b2734a 100644 --- a/packages/frontend/src/App.css +++ b/packages/frontend/src/App.css @@ -5,7 +5,8 @@ .App-header { width: 100%; - background: #fff; + background: #eee; + color: #000; } .App-logo { diff --git a/packages/frontend/src/App.tsx b/packages/frontend/src/App.tsx index 95266d5..eae2f80 100644 --- a/packages/frontend/src/App.tsx +++ b/packages/frontend/src/App.tsx @@ -1,6 +1,6 @@ import * as React from 'react'; import { Types } from '@dotstats/common'; -import { Node, Icon, Tile, Ago } from './components'; +import { Chains, Node, Icon, Tile, Ago } from './components'; import { Connection } from './message'; import { State } from './state'; import { formatNumber } from './utils'; @@ -20,22 +20,33 @@ export default class App extends React.Component<{}, State> { best: 0 as Types.BlockNumber, blockTimestamp: 0 as Types.Timestamp, timeDiff: 0 as Types.Milliseconds, + subscribed: null, + chains: new Set(), nodes: new Map() }; + private connection: Promise; + constructor(props: {}) { super(props); - this.connect(); + this.connection = Connection.create((changes) => { + if (changes) { + this.setState(changes); + } + + return this.state; + }); } public render() { - const { best, blockTimestamp, timeDiff } = this.state; + const { best, blockTimestamp, timeDiff, chains, subscribed } = this.state; Ago.timeDiff = timeDiff; return (
+
#{formatNumber(best)} @@ -63,16 +74,6 @@ export default class App extends React.Component<{}, State> { ); } - private async connect() { - Connection.create((changes) => { - if (changes) { - this.setState(changes); - } - - return this.state; - }); - } - private nodes(): Node.Props[] { return Array.from(this.state.nodes.values()).sort((a, b) => b.blockDetails[0] - a.blockDetails[0]); } diff --git a/packages/frontend/src/components/Chains.css b/packages/frontend/src/components/Chains.css new file mode 100644 index 0000000..43309d5 --- /dev/null +++ b/packages/frontend/src/components/Chains.css @@ -0,0 +1,30 @@ +.Chains { + background: #3c3c3b; + color: #fff; + padding: 0.25em 1em; +} + +.Chains .Icon { + margin-right: 1em; +} + +.Chains-chain { + padding: 0.25em 0.75em; + margin-left: 0.5em; + background: #222; + color: #999; + border-radius: 0.3em; + display: inline-block; + cursor: pointer; + font-family: monospace, sans-serif; + font-size: 0.8em; + font-weight: bold; +} + +.Chains-chain-selected { + background: #eee; + color: #000; + border-radius: 0.3em 0.3em 0 0; + padding-bottom: 1em; + margin-bottom: -1em; +} diff --git a/packages/frontend/src/components/Chains.tsx b/packages/frontend/src/components/Chains.tsx new file mode 100644 index 0000000..2808243 --- /dev/null +++ b/packages/frontend/src/components/Chains.tsx @@ -0,0 +1,50 @@ +import * as React from 'react'; +import { Connection } from '../message'; +import { Icon } from './Icon'; +import { Types, Maybe } from '@dotstats/common'; + +import chainIcon from '../icons/link.svg'; +import './Chains.css'; + +export namespace Chains { + export interface Props { + chains: Set, + subscribed: Maybe, + connection: Promise + } +} + +export class Chains extends React.Component { + public render() { + return ( +
+ + { + this.chains.map((chain) => this.renderChain(chain)) + } +
+ ); + } + + private renderChain(chain: Types.ChainLabel): React.ReactNode { + const className = chain === this.props.subscribed + ? 'Chains-chain Chains-chain-selected' + : 'Chains-chain'; + + return ( + + {chain} + + ) + } + + private get chains(): Types.ChainLabel[] { + return Array.from(this.props.chains); + } + + private async subscribe(chain: Types.ChainLabel) { + const connection = await this.props.connection; + + connection.subscribe(chain); + } +} diff --git a/packages/frontend/src/components/index.ts b/packages/frontend/src/components/index.ts index 03bf546..31279c8 100644 --- a/packages/frontend/src/components/index.ts +++ b/packages/frontend/src/components/index.ts @@ -1,3 +1,4 @@ +export * from './Chains'; export * from './Icon'; export * from './Node'; export * from './Tile'; diff --git a/packages/frontend/src/message.ts b/packages/frontend/src/message.ts index 27192ab..636cea4 100644 --- a/packages/frontend/src/message.ts +++ b/packages/frontend/src/message.ts @@ -63,6 +63,10 @@ export class Connection { this.bindSocket(); } + public subscribe(chain: Types.ChainLabel) { + this.socket.send(`subscribe:${chain}`); + } + private bindSocket() { this.state = this.update({ nodes: new Map() }); this.socket.addEventListener('message', this.handleMessages); @@ -79,7 +83,8 @@ export class Connection { private handleMessages = (event: MessageEvent) => { const data = event.data as FeedMessage.Data; const nodes = this.state.nodes; - const changes = { nodes }; + const chains = this.state.chains; + const changes = { nodes, chains }; messages: for (const message of FeedMessage.deserialize(data)) { switch (message.action) { @@ -140,6 +145,44 @@ export class Connection { continue messages; } + case Actions.AddedChain: { + chains.add(message.payload); + + this.autoSubscribe(); + + break; + } + + case Actions.RemovedChain: { + chains.delete(message.payload); + + if (this.state.subscribed === message.payload) { + nodes.clear(); + + this.state = this.update({ subscribed: null, nodes, chains }); + this.autoSubscribe(); + + continue messages; + } + + break; + } + + case Actions.SubscribedTo: { + this.state = this.update({ subscribed: message.payload }); + + continue messages; + } + + case Actions.UnsubscribedFrom: { + if (this.state.subscribed === message.payload) { + nodes.clear(); + this.state = this.update({ subscribed: null, nodes }); + } + + continue messages; + } + default: { continue messages; } @@ -149,6 +192,16 @@ export class Connection { this.state = this.update(changes); } + private autoSubscribe() { + const { subscribed, chains } = this.state; + + if (subscribed == null && chains.size) { + const first = chains.values().next().value; + + this.subscribe(first); + } + } + private handleDisconnect = async () => { this.clean(); this.socket.close(); diff --git a/packages/frontend/src/state.ts b/packages/frontend/src/state.ts index cb2545f..cc5c616 100644 --- a/packages/frontend/src/state.ts +++ b/packages/frontend/src/state.ts @@ -1,11 +1,13 @@ -import { Types } from '@dotstats/common'; import { Node } from './components/Node'; +import { Types, Maybe } from '@dotstats/common'; export interface State { best: Types.BlockNumber, blockTimestamp: Types.Timestamp, timeDiff: Types.Milliseconds, - nodes: Map + subscribed: Maybe, + chains: Set, + nodes: Map, } export type Update = (changes: Pick | null) => Readonly;