This repository was archived by the owner on Jan 1, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
114 lines (102 loc) · 3.7 KB
/
Copy pathindex.js
File metadata and controls
114 lines (102 loc) · 3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
'use strict';
const _ = require('lodash');
const rc = require('rc');
const once = require('@rootstream/once');
const debug = require('debug')('monologue');
const assert = require('assert');
const uniqid = require('uniqid');
const Promise = require('bluebird');
const WebSocket = require('ws');
const { EventEmitter2 } = require('eventemitter2');
class MonologueClient extends EventEmitter2 {
constructor(opts) {
super({ wildcard: true, maxListeners: getConfig().opts.listeners });
this._id = uniqid('client');
this._opts = _.defaultsDeep(opts, getConfig().opts);
this._callbacks = [];
this._connected = false;
this._connectionId = '';
debug('a new client is created. id=%s opts=%o', this._id, this._opts);
this.connect = once(this._doConnect.bind(this), { reentrant: true });
this.close = once(this._doClose.bind(this), { reentrant: true });
}
get connectionId() {
return this._connectionId;
}
async _doConnect() {
assert.ok(!this._connected);
debug('connecting to websocket endpoint: %s', this._opts.endpoint);
this._ws = new WebSocket(this._opts.endpoint, { headers: { 'x-api-key': this._opts.apiKey } });
const silentClose = async () => {
await this.close().catch(debug);
};
this._ws.once('close', silentClose);
this._ws.once('error', silentClose);
await new Promise(resolve => {
this._ws.once('open', () => {
debug('connection is open for %s - sending whoami packet', this._id);
this._ws.send('whoami');
this._ws.once('message', data => {
debug('whoami packet for %s:%s', this._id, data);
this._connectionId = data;
this._connected = true;
this._ws.on('message', this._messageLoop.bind(this));
resolve();
});
});
}).timeout(+this._opts.timeout);
}
async _messageLoop(what) {
try {
debug('message received for %s:%s', this._id, what);
const { from, payload } = JSON.parse(what);
const type = _.get(payload, 'type', '');
const token = _.get(payload, 'data.token', 'invalid');
if (type === 'REQ') {
const name = _.get(payload, 'data.name', '');
const args = _.get(payload, 'data.args', []);
const fn = _.first(this.listeners(name));
assert.ok(fn);
const ret = await fn.apply(null, args);
this._ws.send(JSON.stringify({ to: from, payload: { data: { token, ret }, type: 'ACK' } }));
}
if (type === 'ACK') {
assert.ok(this._callbacks[token]);
const ret = _.get(payload, 'data.ret');
this._callbacks[token](ret);
delete this._callbacks[token];
}
} catch (err) {
debug('error while processing message %s: %o', what, err);
}
}
async call(to, name, ...args) {
assert.ok(this._connected);
const token = uniqid(this._id);
this._ws.send(JSON.stringify({ to, payload: { data: { token, name, args }, type: 'REQ' } }));
return await new Promise(resolve => {
this._callbacks[token] = resolve;
})
.timeout(+this._opts.timeout)
.catch(err => {
debug('call with token %s expired without a response: %o', token, err);
delete this._callbacks[token];
throw err; // return back to caller
});
}
async _doClose() {
assert.ok(this._connected);
this._ws.removeAllListeners('message');
this._ws.close();
this._connected = false;
this._connectionId = '';
}
}
const DEFAULT_CONFIG = {
opts: { endpoint: '', timeout: 5000, listeners: 20, apiKey: '' },
};
const USER_CONFIG = rc('monologue', DEFAULT_CONFIG);
const CONFIG = _.assign({}, DEFAULT_CONFIG, USER_CONFIG);
const getConfig = () => CONFIG;
Promise.config({ cancellation: true });
module.exports = MonologueClient;