fix minor log entry issue
[websub-hub] / src / worker.js
1 'use strict';
2
3 const common = require('./common');
4
5 const _fileScope = common.fileScope(__filename);
6
7 /**
8 * Always try to do some things, but not too many.
9 * This is a generic polling promise-wrangler, keeping a set number
10 * of promises in flight, trying to replace them as they finish.
11 */
12
13 /**
14 * @callback Worker~promiseGiver
15 * @param {*} dbCtx
16 * @param {number} atMost
17 * @returns {Promise<void>[]}
18 */
19
20 class Worker {
21 /**
22 * @param {object} logger
23 * @param {object} db
24 * @param {Worker~promiseGiver} promiseGiver
25 * @param {object} options
26 * @param {object} options.worker
27 * @param {object} options.worker.pollingEnabled
28 * @param {number} options.worker.recurrSleepMs
29 * @param {number} options.worker.concurrency
30 */
31 constructor(logger, db, promiseGiver, options) {
32 this.logger = logger;
33 this.db = db;
34 this.options = options;
35 if (!promiseGiver || typeof promiseGiver !== 'function') {
36 throw new TypeError('function required');
37 }
38 this.promiseGiver = promiseGiver;
39
40 this.concurrency = this.options.worker.concurrency;
41 this.recurrSleepMs = this.options.worker.recurrSleepMs;
42 this.inFlight = []; // Our work heap of Promises
43 this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
44 this.running = false;
45 this.isProcessing = false; // Only let one process() method execute on the work heap at a time
46 }
47
48 /**
49 * Begin the scheduled loop.
50 */
51 start(stagger = 0.618) {
52 const _scope = _fileScope('start');
53 this.logger.debug(_scope, 'called', {});
54 if (this.options.worker.pollingEnabled) {
55 this.running = true;
56 // Try to keep clustered nodes from all processing at the same time.
57 const staggerMs = Math.floor(Math.random() * this.recurrSleepMs * stagger);
58 this.nextTimeout = setTimeout(this._recurr.bind(this), staggerMs);
59 }
60 }
61
62 /**
63 * Cancel the scheduled loop.
64 */
65 stop() {
66 const _scope = _fileScope('stop');
67 this.logger.debug(_scope, 'called', {});
68 this.running = false;
69 clearTimeout(this.nextTimeout);
70 this.nextTimeout = undefined;
71 }
72
73 /**
74 * The problem: Promise.race doesn't report which promise(s) settled, and
75 * there is no native interface for querying promise state.
76 * So we will wrap all our pending-work promises with a flag and the
77 * results, and use the race as a sort of condvar for checking everything
78 * in the list of what we were waiting for.
79 * NB this means promise cannot be further chained, or it loses the magic.
80 * @param {Promise} promise
81 * @returns {Promise} watchedPromise
82 */
83 static watchedPromise(promise) {
84 if (Object.prototype.hasOwnProperty.call(promise, 'isSettled')) {
85 return promise;
86 }
87
88 let isSettled = false;
89 let resolved = undefined;
90 let rejected = undefined;
91
92 promise = promise.then(
93 (res) => {
94 isSettled = true;
95 resolved = res;
96 return res;
97 },
98 (rej) => {
99 isSettled = true;
100 rejected = rej;
101 throw rej;
102 });
103
104 Object.defineProperties(promise, {
105 isSettled: { get: () => isSettled },
106 resolved: { get: () => resolved },
107 rejected: { get: () => rejected },
108 });
109
110 return promise;
111 }
112
113 /**
114 * Process the list of promises, removing any which have settled,
115 * and passes their fulfilled values to the handler.
116 *
117 * @param {HandlerFunction} handler
118 * @returns {number} handled
119 */
120 _handleWatchedList(handler) {
121 let handled = 0;
122 for (let i = this.inFlight.length - 1; i >= 0; i--) {
123 // eslint-disable-next-line security/detect-object-injection
124 const p = this.inFlight[i];
125 if (p.isSettled) {
126 handler(p.resolved, p.rejected);
127 this.inFlight.splice(i, 1);
128 handled += 1;
129 }
130 }
131 return handled;
132 }
133
134 /**
135 * Refill the workpool with our special promises.
136 * @param {*} dbCtx
137 * @returns {Promise[]}
138 */
139 async _getWork(dbCtx) {
140 const _scope = _fileScope('_getWork');
141 let newPromises = [];
142 const wanted = this.concurrency - this.inFlight.length;
143 if (wanted > 0) {
144 newPromises = await this.promiseGiver(dbCtx, wanted);
145 newPromises = newPromises.map((p) => Worker.watchedPromise(p));
146 common.stackSafePush(this.inFlight, newPromises);
147 }
148 this.logger.debug(_scope, 'completed', { wanted, added: newPromises.length });
149 return newPromises;
150 }
151
152 /**
153 * Simply log results of promises, for now.
154 * @param {*} resolved
155 * @param {*} rejected
156 */
157 _watchedHandler(resolved, rejected) {
158 const _scope = _fileScope('_watchedHandler');
159
160 this.logger.debug(_scope, { resolved, rejected });
161 if (rejected) {
162 this.logger.error(_scope, { rejected });
163 }
164 }
165
166 /**
167 * Schedule the next getWork.
168 */
169 _recurr() {
170 if (this.running && this.recurrSleepMs) {
171 this.nextTimeout = setTimeout(this.process.bind(this), this.recurrSleepMs);
172 }
173 }
174
175 /**
176 * Attempt to do as much work as we can.
177 */
178 async process() {
179 const _scope = _fileScope('process');
180
181 this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing });
182
183
184 if (this.isProcessing) {
185 return;
186 }
187 this.isProcessing = true;
188
189 // Interrupt any pending sleep, if we were called out of timeout-cycle.
190 clearTimeout(this.nextTimeout);
191
192 try {
193 await this.db.context(async (dbCtx) => {
194
195 // Try to fill the hopper
196 await this._getWork(dbCtx);
197
198 while (this.inFlight.length > 0) {
199 /* Wait for one or more to be resolved.
200 * We don't care what the result was, as we have to scan the list
201 * for all settled promises anyhow, and our wrapper has stored the
202 * results.
203 */
204 try {
205 await Promise.race(this.inFlight);
206 } catch (e) {
207 // NOP here, as we'll handle it when we scan the list
208 }
209 this.logger.debug(_scope, { msg: 'race completed' });
210
211 // Address settled promises..
212 const settled = this._handleWatchedList(this._watchedHandler.bind(this));
213 this.logger.debug(_scope, { settled });
214
215 // Try to fill the vacancy
216 // TODO: maybe rate-limit this call based on slot availability
217 await this._getWork(dbCtx);
218 }
219 }); // dbCtx
220 } catch (e) {
221 this.logger.error(_scope, 'failed', { error: e });
222 // Try again later anyhow.
223 }
224
225 // No more work, wait a while and retry
226 this._recurr();
227
228 this.isProcessing = false;
229 }
230
231 }
232
233 module.exports = Worker;