Add ping to the client, reconnect on dead connections

This commit is contained in:
maciejhirsz
2018-07-25 17:49:53 +02:00
parent c12374209d
commit a909400678
4 changed files with 88 additions and 14 deletions
+32 -12
View File
@@ -93,21 +93,28 @@ export default class Feed {
return {
action: Actions.RemovedChain,
payload: label
}
};
}
public static subscribedTo(label: Types.ChainLabel): FeedMessage.Message {
return {
action: Actions.SubscribedTo,
payload: label,
}
payload: label
};
}
public static unsubscribedFrom(label: Types.ChainLabel): FeedMessage.Message {
return {
action: Actions.UnsubscribedFrom,
payload: label,
}
payload: label
};
}
public static pong(payload: string): FeedMessage.Message {
return {
action: Actions.Pong,
payload
};
}
public sendData(data: FeedMessage.Data) {
@@ -131,15 +138,28 @@ export default class Feed {
}
private handleCommand(cmd: string) {
if (cmd.startsWith('subscribe:')) {
if (this.chain) {
this.events.emit('unsubscribe', this.chain);
this.chain = null;
}
const [tag, payload] = cmd.split(':', 2) as [string, Maybe<string>];
const label = cmd.substr(10) as Types.ChainLabel;
if (!payload) {
return;
}
this.events.emit('subscribe', label);
switch (tag) {
case 'subscribe':
if (this.chain) {
this.events.emit('unsubscribe', this.chain);
this.chain = null;
}
this.events.emit('subscribe', payload as Types.ChainLabel);
break;
case 'ping':
this.sendMessage(Feed.pong(payload));
break;
default:
console.error('Unknown command tag:', tag);
}
}
+8 -1
View File
@@ -29,6 +29,7 @@ export const Actions = {
RemovedChain : 0x09 as 0x09,
SubscribedTo : 0x0A as 0x0A,
UnsubscribedFrom : 0x0B as 0x0B,
Pong : 0x0C as 0x0C,
};
export type Action = typeof Actions[keyof typeof Actions];
@@ -98,6 +99,11 @@ export namespace Variants {
action: typeof Actions.UnsubscribedFrom;
payload: ChainLabel;
}
export interface PongMessage extends MessageBase {
action: typeof Actions.Pong;
payload: string; // just echo whatever `ping` sent
}
}
export type Message =
@@ -112,7 +118,8 @@ export type Message =
| Variants.AddedChainMessage
| Variants.RemovedChainMessage
| Variants.SubscribedToMessage
| Variants.UnsubscribedFromMessage;
| Variants.UnsubscribedFromMessage
| Variants.PongMessage;
/**
* Opaque data type to be sent to the feed. Passing through
+2 -1
View File
@@ -6,4 +6,5 @@ import * as FeedMessage from './feed';
export { Types, FeedMessage };
export const VERSION: Types.FeedVersion = 5 as Types.FeedVersion;
// Increment this if breaking changes were made to types in `feed.ts`
export const VERSION: Types.FeedVersion = 6 as Types.FeedVersion;
+46
View File
@@ -57,6 +57,9 @@ export class Connection {
});
}
private pingId = 0;
private pingTimeout: NodeJS.Timer;
private pingSent: Maybe<Types.Timestamp> = null;
private socket: WebSocket;
private state: Readonly<State>;
private readonly update: Update;
@@ -72,6 +75,8 @@ export class Connection {
}
private bindSocket() {
this.ping();
this.state = this.update({
status: 'online',
nodes: new Map()
@@ -87,7 +92,42 @@ export class Connection {
this.socket.addEventListener('error', this.handleDisconnect);
}
private ping = () => {
if (this.pingSent) {
this.handleDisconnect();
return;
}
this.pingId += 1;
this.pingSent = timestamp();
this.socket.send(`ping:${this.pingId}`);
}
private pong(id: number) {
if (!this.pingSent) {
console.error('Received a pong without sending a ping first');
this.handleDisconnect();
return;
}
if (id !== this.pingId) {
console.error('pingId differs');
this.handleDisconnect();
}
const latency = timestamp() - this.pingSent;
this.pingSent = null;
console.log('latency', latency);
this.pingTimeout = setTimeout(this.ping, 30000);
}
private clean() {
clearTimeout(this.pingTimeout);
this.socket.removeEventListener('message', this.handleMessages);
this.socket.removeEventListener('close', this.handleDisconnect);
this.socket.removeEventListener('error', this.handleDisconnect);
@@ -223,6 +263,12 @@ export class Connection {
continue messages;
}
case Actions.Pong: {
this.pong(Number(message.payload));
continue messages;
}
default: {
continue messages;
}