Merge branch 'v1.1-dev' as v1.1.1
[websub-hub] / src / worker.js
1 'use strict';
2
3 const common = require('./common');
4
5 const _fileScope = common.fileScope(__filename);
6
7 /**
8 * Always try to do some things, but not too many.
9 * This is a generic polling promise-wrangler, keeping a set number
10 * of promises in flight, trying to replace them as they finish.
11 */
12
13 /**
14 * @callback Worker~promiseGiver
15 * @param {number} atMost
16 * @returns {Promise<void>[]}
17 */
18
19 class Worker {
20 /**
21 * @param {object} logger
22 * @param {Worker~promiseGiver} promiseGiver
23 * @param {object} options
24 * @param {object} options.worker
25 * @param {object} options.worker.pollingEnabled
26 * @param {number} options.worker.recurrSleepMs
27 * @param {number} options.worker.concurrency
28 */
29 constructor(logger, promiseGiver, options) {
30 this.logger = logger;
31 this.options = options;
32 if (!promiseGiver || typeof promiseGiver !== 'function') {
33 throw new TypeError('function required');
34 }
35 this.promiseGiver = promiseGiver;
36
37 this.concurrency = this.options.worker.concurrency;
38 this.recurrSleepMs = this.options.worker.recurrSleepMs;
39 this.inFlight = []; // Our work heap of Promises
40 this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
41 this.running = false;
42 }
43
44 /**
45 * Begin the scheduled loop.
46 */
47 start(stagger = 0.618) {
48 const _scope = _fileScope('start');
49 this.logger.debug(_scope, 'called', {});
50 if (this.options.worker.pollingEnabled) {
51 this.running = true;
52 // Try to keep clustered nodes from all processing at the same time.
53 const staggerMs = Math.floor(Math.random() * this.recurrSleepMs * stagger);
54 this.nextTimeout = setTimeout(this._recurr.bind(this), staggerMs);
55 }
56 }
57
58 /**
59 * Cancel the scheduled loop.
60 */
61 stop() {
62 const _scope = _fileScope('stop');
63 this.logger.debug(_scope, 'called', {});
64 this.running = false;
65 clearTimeout(this.nextTimeout);
66 this.nextTimeout = undefined;
67 }
68
69 /**
70 * The problem: Promise.race doesn't report which promise(s) settled, and
71 * there is no native interface for querying promise state.
72 * So we will wrap all our pending-work promises with a flag and the
73 * results, and use the race as a sort of condvar for checking everything
74 * in the list of what we were waiting for.
75 * NB this means promise cannot be further chained, or it loses the magic.
76 * @param {Promise} promise
77 * @returns {Promise} watchedPromise
78 */
79 static watchedPromise(promise) {
80 if (Object.prototype.hasOwnProperty.call(promise, 'isSettled')) {
81 return promise;
82 }
83
84 let isSettled = false;
85 let resolved = undefined;
86 let rejected = undefined;
87
88 promise = promise.then(
89 (res) => {
90 isSettled = true;
91 resolved = res;
92 return res;
93 },
94 (rej) => {
95 isSettled = true;
96 rejected = rej;
97 throw rej;
98 });
99
100 Object.defineProperties(promise, {
101 isSettled: { get: () => isSettled },
102 resolved: { get: () => resolved },
103 rejected: { get: () => rejected },
104 });
105
106 return promise;
107 }
108
109 /**
110 * Process the list of promises, removing any which have settled,
111 * and passes their fulfilled values to the handler.
112 *
113 * @param {HandlerFunction} handler
114 * @returns {number} handled
115 */
116 _handleWatchedList(handler) {
117 let handled = 0;
118 for (let i = this.inFlight.length - 1; i >= 0; i--) {
119 // eslint-disable-next-line security/detect-object-injection
120 const p = this.inFlight[i];
121 if (p.isSettled) {
122 handler(p.resolved, p.rejected);
123 this.inFlight.splice(i, 1);
124 handled += 1;
125 }
126 }
127 return handled;
128 }
129
130 /**
131 * Refill the workpool with our special promises.
132 * @returns {Promise[]}
133 */
134 async _getWork() {
135 const _scope = _fileScope('_getWork');
136 let newPromises = [];
137 const wanted = this.concurrency - this.inFlight.length;
138 if (wanted > 0) {
139 newPromises = await this.promiseGiver(wanted);
140 newPromises = newPromises.map((p) => Worker.watchedPromise(p));
141 common.stackSafePush(this.inFlight, newPromises);
142 }
143 this.logger.debug(_scope, 'completed', { wanted, added: newPromises.length });
144 return newPromises;
145 }
146
147 /**
148 * Simply log results of promises, for now.
149 * @param {*} resolved
150 * @param {*} rejected
151 */
152 _watchedHandler(resolved, rejected) {
153 const _scope = _fileScope('_watchedHandler');
154
155 this.logger.debug(_scope, { resolved, rejected });
156 if (rejected) {
157 this.logger.error(_scope, { rejected });
158 }
159 }
160
161 /**
162 * Schedule the next getWork.
163 */
164 _recurr() {
165 if (this.running && this.recurrSleepMs) {
166 this.nextTimeout = setTimeout(this.process.bind(this), this.recurrSleepMs);
167 }
168 }
169
170 /**
171 * Attempt to do as much work as we can.
172 */
173 async process() {
174 const _scope = _fileScope('process');
175
176 this.logger.debug(_scope, 'called', {});
177
178 // Interrupt any pending sleep, if we were called out of timeout-cycle.
179 clearTimeout(this.nextTimeout);
180
181 // Try to fill the hopper
182 await this._getWork();
183
184 while (this.inFlight.length > 0) {
185 /* Wait for one or more to be resolved.
186 * We don't care what the result was, as we have to scan the list
187 * for all settled promises anyhow, and our wrapper has stored the
188 * results.
189 */
190 try {
191 await Promise.race(this.inFlight);
192 } catch (e) {
193 // NOP here, as we'll handle it when we scan the list
194 }
195 this.logger.debug(_scope, { msg: 'race completed' });
196
197 // Address settled promises..
198 const settled = this._handleWatchedList(this._watchedHandler.bind(this));
199 this.logger.debug(_scope, { settled });
200
201 // Try to fill the vacancy
202 // TODO: maybe rate-limit this call based on slot availability
203 await this._getWork();
204 }
205
206 // No more work, wait a while and retry
207 this._recurr();
208 }
209
210 }
211
212 module.exports = Worker;