9b6fd7ab882752f00dc76ae693db8980e8eadc76
[websub-hub] / src / worker.js
1 'use strict';
2
3 const common = require('./common');
4
5 const _fileScope = common.fileScope(__filename);
6
7 /**
8 * Always try to do some things, but not too many.
9 * This is a generic polling promise-wrangler, keeping a set number
10 * of promises in flight, trying to replace them as they finish.
11 */
12
13 /**
14 * @callback Worker~promiseGiver
15 * @param {*} dbCtx
16 * @param {number} atMost
17 * @returns {Promise<void>[]}
18 */
19
20 class Worker {
21 /**
22 * @param {object} logger
23 * @param {object} db
24 * @param {Worker~promiseGiver} promiseGiver
25 * @param {object} options
26 * @param {object} options.worker
27 * @param {object} options.worker.pollingEnabled
28 * @param {number} options.worker.recurrSleepMs
29 * @param {number} options.worker.concurrency
30 */
31 constructor(logger, db, promiseGiver, options) {
32 this.logger = logger;
33 this.db = db;
34 this.options = options;
35 if (!promiseGiver || typeof promiseGiver !== 'function') {
36 throw new TypeError('function required');
37 }
38 this.promiseGiver = promiseGiver;
39
40 this.concurrency = this.options.worker.concurrency;
41 this.recurrSleepMs = this.options.worker.recurrSleepMs;
42 this.inFlight = []; // Our work heap of Promises
43 this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
44 this.running = false;
45 }
46
47 /**
48 * Begin the scheduled loop.
49 */
50 start(stagger = 0.618) {
51 const _scope = _fileScope('start');
52 this.logger.debug(_scope, 'called', {});
53 if (this.options.worker.pollingEnabled) {
54 this.running = true;
55 // Try to keep clustered nodes from all processing at the same time.
56 const staggerMs = Math.floor(Math.random() * this.recurrSleepMs * stagger);
57 this.nextTimeout = setTimeout(this._recurr.bind(this), staggerMs);
58 }
59 }
60
61 /**
62 * Cancel the scheduled loop.
63 */
64 stop() {
65 const _scope = _fileScope('stop');
66 this.logger.debug(_scope, 'called', {});
67 this.running = false;
68 clearTimeout(this.nextTimeout);
69 this.nextTimeout = undefined;
70 }
71
72 /**
73 * The problem: Promise.race doesn't report which promise(s) settled, and
74 * there is no native interface for querying promise state.
75 * So we will wrap all our pending-work promises with a flag and the
76 * results, and use the race as a sort of condvar for checking everything
77 * in the list of what we were waiting for.
78 * NB this means promise cannot be further chained, or it loses the magic.
79 * @param {Promise} promise
80 * @returns {Promise} watchedPromise
81 */
82 static watchedPromise(promise) {
83 if (Object.prototype.hasOwnProperty.call(promise, 'isSettled')) {
84 return promise;
85 }
86
87 let isSettled = false;
88 let resolved = undefined;
89 let rejected = undefined;
90
91 promise = promise.then(
92 (res) => {
93 isSettled = true;
94 resolved = res;
95 return res;
96 },
97 (rej) => {
98 isSettled = true;
99 rejected = rej;
100 throw rej;
101 });
102
103 Object.defineProperties(promise, {
104 isSettled: { get: () => isSettled },
105 resolved: { get: () => resolved },
106 rejected: { get: () => rejected },
107 });
108
109 return promise;
110 }
111
112 /**
113 * Process the list of promises, removing any which have settled,
114 * and passes their fulfilled values to the handler.
115 *
116 * @param {HandlerFunction} handler
117 * @returns {number} handled
118 */
119 _handleWatchedList(handler) {
120 let handled = 0;
121 for (let i = this.inFlight.length - 1; i >= 0; i--) {
122 // eslint-disable-next-line security/detect-object-injection
123 const p = this.inFlight[i];
124 if (p.isSettled) {
125 handler(p.resolved, p.rejected);
126 this.inFlight.splice(i, 1);
127 handled += 1;
128 }
129 }
130 return handled;
131 }
132
133 /**
134 * Refill the workpool with our special promises.
135 * @param {*} dbCtx
136 * @returns {Promise[]}
137 */
138 async _getWork(dbCtx) {
139 const _scope = _fileScope('_getWork');
140 let newPromises = [];
141 const wanted = this.concurrency - this.inFlight.length;
142 if (wanted > 0) {
143 newPromises = await this.promiseGiver(dbCtx, wanted);
144 newPromises = newPromises.map((p) => Worker.watchedPromise(p));
145 common.stackSafePush(this.inFlight, newPromises);
146 }
147 this.logger.debug(_scope, 'completed', { wanted, added: newPromises.length });
148 return newPromises;
149 }
150
151 /**
152 * Simply log results of promises, for now.
153 * @param {*} resolved
154 * @param {*} rejected
155 */
156 _watchedHandler(resolved, rejected) {
157 const _scope = _fileScope('_watchedHandler');
158
159 this.logger.debug(_scope, { resolved, rejected });
160 if (rejected) {
161 this.logger.error(_scope, { rejected });
162 }
163 }
164
165 /**
166 * Schedule the next getWork.
167 */
168 _recurr() {
169 if (this.running && this.recurrSleepMs) {
170 this.nextTimeout = setTimeout(this.process.bind(this), this.recurrSleepMs);
171 }
172 }
173
174 /**
175 * Attempt to do as much work as we can.
176 */
177 async process() {
178 const _scope = _fileScope('process');
179
180 this.logger.debug(_scope, 'called', {});
181
182 // Interrupt any pending sleep, if we were called out of timeout-cycle.
183 clearTimeout(this.nextTimeout);
184
185 // Share one db connection for all tasks.
186 try {
187 await this.db.context(async (dbCtx) => {
188
189 // Try to fill the hopper
190 await this._getWork(dbCtx);
191
192 while (this.inFlight.length > 0) {
193 /* Wait for one or more to be resolved.
194 * We don't care what the result was, as we have to scan the list
195 * for all settled promises anyhow, and our wrapper has stored the
196 * results.
197 */
198 try {
199 await Promise.race(this.inFlight);
200 } catch (e) {
201 // NOP here, as we'll handle it when we scan the list
202 }
203 this.logger.debug(_scope, { msg: 'race completed' });
204
205 // Address settled promises..
206 const settled = this._handleWatchedList(this._watchedHandler.bind(this));
207 this.logger.debug(_scope, { settled });
208
209 // Try to fill the vacancy
210 // TODO: maybe rate-limit this call based on slot availability
211 await this._getWork(dbCtx);
212 }
213 }); // dbCtx
214 } catch (e) {
215 this.logger.error(_scope, 'failed', { error: e });
216 // Try again later anyhow.
217 }
218
219 // No more work, wait a while and retry
220 this._recurr();
221 }
222
223 }
224
225 module.exports = Worker;