Merge branch 'v1.3-dev'
[websub-hub] / src / worker.js
1 'use strict';
2
3 const common = require('./common');
4
5 const _fileScope = common.fileScope(__filename);
6
7 /**
8 * Always try to do some things, but not too many.
9 * This is a generic polling promise-wrangler, keeping a set number
10 * of promises in flight, trying to replace them as they finish.
11 */
12
13 /**
14 * @callback PromiseGiver
15 * @param {*} dbCtx
16 * @param {number} atMost
17 * @returns {Promise<void>[]}
18 */
19
20 class Worker {
21 /**
22 * @param {object} logger logger instance
23 * @param {object} db db instance
24 * @param {PromiseGiver} promiseGiver function which fetches and processes work
25 * @param {object} options options
26 * @param {object} options.worker worker options
27 * @param {object} options.worker.pollingEnabled whether to run worker at all
28 * @param {number} options.worker.recurrSleepMs time between processing runs
29 * @param {number} options.worker.concurrency how much work to be working on at once
30 */
31 constructor(logger, db, promiseGiver, options) {
32 this.logger = logger;
33 this.db = db;
34 this.options = options;
35 if (!promiseGiver || typeof promiseGiver !== 'function') {
36 throw new TypeError('function required');
37 }
38 this.promiseGiver = promiseGiver;
39
40 this.concurrency = this.options.worker.concurrency;
41 this.recurrSleepMs = this.options.worker.recurrSleepMs;
42 this.inFlight = []; // Our work heap of Promises
43 this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
44 this.running = false; // Worker is running.
45 this.isProcessing = false; // Only let one process() method execute on the work heap at a time
46 }
47
48 /**
49 * Begin the scheduled loop.
50 * @param {number} stagger vary startup time by some fraction of recurrence
51 */
52 start(stagger = 0.618) {
53 const _scope = _fileScope('start');
54 this.logger.debug(_scope, 'called', {});
55 if (this.options.worker.pollingEnabled) {
56 this.running = true;
57 // Try to keep clustered nodes from all processing at the same time.
58 const staggerMs = Math.floor(Math.random() * this.recurrSleepMs * stagger);
59 this.nextTimeout = setTimeout(this._recurr.bind(this), staggerMs);
60 }
61 }
62
63 /**
64 * Cancel the scheduled loop.
65 */
66 stop() {
67 const _scope = _fileScope('stop');
68 this.logger.debug(_scope, 'called', {});
69 this.running = false;
70 clearTimeout(this.nextTimeout);
71 this.nextTimeout = undefined;
72 }
73
74 /**
75 * The problem: Promise.race doesn't report which promise(s) settled, and
76 * there is no native interface for querying promise state.
77 * So we will wrap all our pending-work promises with a flag and the
78 * results, and use the race as a sort of condvar for checking everything
79 * in the list of what we were waiting for.
80 * NB this means promise cannot be further chained, or it loses the magic.
81 * @param {Promise} promise promise to watch
82 * @returns {Promise} watchedPromise
83 */
84 static watchedPromise(promise) {
85 if (Object.hasOwn(promise, 'isSettled')) {
86 return promise;
87 }
88
89 let isSettled = false;
90 let resolved;
91 let rejected;
92
93 promise = promise.then(
94 (res) => {
95 isSettled = true;
96 resolved = res;
97 return res;
98 },
99 (rej) => {
100 isSettled = true;
101 rejected = rej;
102 throw rej;
103 });
104
105 Object.defineProperties(promise, {
106 isSettled: { get: () => isSettled },
107 resolved: { get: () => resolved },
108 rejected: { get: () => rejected },
109 });
110
111 return promise;
112 }
113
114 /**
115 * @callback HandlerFunction
116 * @param {*} resolved
117 * @param {*} rejected
118 * @returns {void}
119 */
120 /**
121 * Process the list of promises, removing any which have settled,
122 * and passes their fulfilled values to the handler.
123 * @param {HandlerFunction} handler invoked on settled promises
124 * @returns {number} handled promises removed from inFlight list
125 */
126 _handleWatchedList(handler) {
127 let handled = 0;
128 for (let i = this.inFlight.length - 1; i >= 0; i--) {
129 // eslint-disable-next-line security/detect-object-injection
130 const p = this.inFlight[i];
131 if (p.isSettled) {
132 handler(p.resolved, p.rejected);
133 this.inFlight.splice(i, 1);
134 handled += 1;
135 }
136 }
137 return handled;
138 }
139
140 /**
141 * Refill the workpool with our special promises.
142 * @param {*} dbCtx db context
143 * @returns {Promise<Promise[]>} wrapped promises
144 */
145 async _getWork(dbCtx) {
146 const _scope = _fileScope('_getWork');
147 let newPromises = [];
148 const wanted = this.concurrency - this.inFlight.length;
149 if (wanted > 0) {
150 newPromises = await this.promiseGiver(dbCtx, wanted);
151 newPromises = newPromises.map((p) => Worker.watchedPromise(p));
152 common.stackSafePush(this.inFlight, newPromises);
153 }
154 this.logger.debug(_scope, 'completed', { wanted, added: newPromises.length });
155 return newPromises;
156 }
157
158 /**
159 * Simply log results of promises, for now.
160 * @param {*} resolved promise resolution value
161 * @param {*} rejected promise rejection value
162 */
163 _watchedHandler(resolved, rejected) {
164 const _scope = _fileScope('_watchedHandler');
165
166 if (rejected) {
167 this.logger.error(_scope, 'rejected', { rejected });
168 } else {
169 this.logger.debug(_scope, 'resolved', { resolved });
170 }
171 }
172
173 /**
174 * Schedule the next getWork.
175 */
176 _recurr() {
177 if (this.running && this.recurrSleepMs) {
178 this.nextTimeout = setTimeout(this.process.bind(this), this.recurrSleepMs);
179 }
180 }
181
182 /**
183 * Attempt to do as much work as we can.
184 */
185 async process() {
186 const _scope = _fileScope('process');
187
188 this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing });
189
190
191 if (this.isProcessing) {
192 return;
193 }
194 this.isProcessing = true;
195
196 // Interrupt any pending sleep, if we were called out of timeout-cycle.
197 clearTimeout(this.nextTimeout);
198 this.nextTimeout = undefined;
199
200 try {
201 await this.db.context(async (dbCtx) => {
202
203 // Try to fill the hopper
204 await this._getWork(dbCtx);
205
206 while (this.inFlight.length > 0) {
207 /* Wait for one or more to be resolved.
208 * We don't care what the result was, as we have to scan the list
209 * for all settled promises anyhow, and our wrapper has stored the
210 * results.
211 */
212 try {
213 await Promise.race(this.inFlight);
214 } catch (e) { // eslint-disable-line no-unused-vars
215 // NOP here, as we'll handle it when we scan the list
216 }
217
218 // Address settled promises..
219 this._handleWatchedList(this._watchedHandler.bind(this));
220
221 // Try to fill the vacancy
222 // TODO: maybe rate-limit this call based on slot availability
223 await this._getWork(dbCtx);
224 }
225 }); // dbCtx
226 } catch (e) {
227 this.logger.error(_scope, 'failed', { error: e });
228 // Try again later anyhow.
229 }
230
231 // No more work, wait a while and retry
232 this._recurr();
233
234 this.isProcessing = false;
235 }
236
237 }
238
239 module.exports = Worker;