3 const common
= require('./common');
5 const _fileScope
= common
.fileScope(__filename
);
8 * Always try to do some things, but not too many.
9 * This is a generic polling promise-wrangler, keeping a set number
10 * of promises in flight, trying to replace them as they finish.
14 * @callback PromiseGiver
16 * @param {number} atMost
17 * @returns {Promise<void>[]}
22 * @param {object} logger logger instance
23 * @param {object} db db instance
24 * @param {PromiseGiver} promiseGiver function which fetches and processes work
25 * @param {object} options options
26 * @param {object} options.worker worker options
27 * @param {object} options.worker.pollingEnabled whether to run worker at all
28 * @param {number} options.worker.recurrSleepMs time between processing runs
29 * @param {number} options.worker.concurrency how much work to be working on at once
31 constructor(logger
, db
, promiseGiver
, options
) {
34 this.options
= options
;
35 if (!promiseGiver
|| typeof promiseGiver
!== 'function') {
36 throw new TypeError('function required');
38 this.promiseGiver
= promiseGiver
;
40 this.concurrency
= this.options
.worker
.concurrency
;
41 this.recurrSleepMs
= this.options
.worker
.recurrSleepMs
;
42 this.inFlight
= []; // Our work heap of Promises
43 this.nextTimeout
= undefined; // Allow clearTimeout() to reset waiting period.
44 this.running
= false; // Worker is running.
45 this.isProcessing
= false; // Only let one process() method execute on the work heap at a time
49 * Begin the scheduled loop.
50 * @param {number} stagger vary startup time by some fraction of recurrence
52 start(stagger
= 0.618) {
53 const _scope
= _fileScope('start');
54 this.logger
.debug(_scope
, 'called', {});
55 if (this.options
.worker
.pollingEnabled
) {
57 // Try to keep clustered nodes from all processing at the same time.
58 const staggerMs
= Math
.floor(Math
.random() * this.recurrSleepMs
* stagger
);
59 this.nextTimeout
= setTimeout(this._recurr
.bind(this), staggerMs
);
64 * Cancel the scheduled loop.
67 const _scope
= _fileScope('stop');
68 this.logger
.debug(_scope
, 'called', {});
70 clearTimeout(this.nextTimeout
);
71 this.nextTimeout
= undefined;
75 * The problem: Promise.race doesn't report which promise(s) settled, and
76 * there is no native interface for querying promise state.
77 * So we will wrap all our pending-work promises with a flag and the
78 * results, and use the race as a sort of condvar for checking everything
79 * in the list of what we were waiting for.
80 * NB this means promise cannot be further chained, or it loses the magic.
81 * @param {Promise} promise promise to watch
82 * @returns {Promise} watchedPromise
84 static watchedPromise(promise
) {
85 if (Object
.hasOwn(promise
, 'isSettled')) {
89 let isSettled
= false;
93 promise
= promise
.then(
105 Object
.defineProperties(promise
, {
106 isSettled: { get: () => isSettled
},
107 resolved: { get: () => resolved
},
108 rejected: { get: () => rejected
},
115 * @callback HandlerFunction
116 * @param {*} resolved
117 * @param {*} rejected
121 * Process the list of promises, removing any which have settled,
122 * and passes their fulfilled values to the handler.
123 * @param {HandlerFunction} handler invoked on settled promises
124 * @returns {number} handled promises removed from inFlight list
126 _handleWatchedList(handler
) {
128 for (let i
= this.inFlight
.length
- 1; i
>= 0; i
--) {
129 // eslint-disable-next-line security/detect-object-injection
130 const p
= this.inFlight
[i
];
132 handler(p
.resolved
, p
.rejected
);
133 this.inFlight
.splice(i
, 1);
141 * Refill the workpool with our special promises.
142 * @param {*} dbCtx db context
143 * @returns {Promise<Promise[]>} wrapped promises
145 async
_getWork(dbCtx
) {
146 const _scope
= _fileScope('_getWork');
147 let newPromises
= [];
148 const wanted
= this.concurrency
- this.inFlight
.length
;
150 newPromises
= await
this.promiseGiver(dbCtx
, wanted
);
151 newPromises
= newPromises
.map((p
) => Worker
.watchedPromise(p
));
152 common
.stackSafePush(this.inFlight
, newPromises
);
154 this.logger
.debug(_scope
, 'completed', { wanted
, added: newPromises
.length
});
159 * Simply log results of promises, for now.
160 * @param {*} resolved promise resolution value
161 * @param {*} rejected promise rejection value
163 _watchedHandler(resolved
, rejected
) {
164 const _scope
= _fileScope('_watchedHandler');
167 this.logger
.error(_scope
, 'rejected', { rejected
});
169 this.logger
.debug(_scope
, 'resolved', { resolved
});
174 * Schedule the next getWork.
177 if (this.running
&& this.recurrSleepMs
) {
178 this.nextTimeout
= setTimeout(this.process
.bind(this), this.recurrSleepMs
);
183 * Attempt to do as much work as we can.
186 const _scope
= _fileScope('process');
188 this.logger
.debug(_scope
, 'called', { isProcessing: this.isProcessing
});
191 if (this.isProcessing
) {
194 this.isProcessing
= true;
196 // Interrupt any pending sleep, if we were called out of timeout-cycle.
197 clearTimeout(this.nextTimeout
);
198 this.nextTimeout
= undefined;
201 await
this.db
.context(async (dbCtx
) => {
203 // Try to fill the hopper
204 await
this._getWork(dbCtx
);
206 while (this.inFlight
.length
> 0) {
207 /* Wait for one or more to be resolved.
208 * We don't care what the result was, as we have to scan the list
209 * for all settled promises anyhow, and our wrapper has stored the
213 await Promise
.race(this.inFlight
);
214 } catch (e
) { // eslint-disable-line no-unused-vars
215 // NOP here, as we'll handle it when we scan the list
218 // Address settled promises..
219 this._handleWatchedList(this._watchedHandler
.bind(this));
221 // Try to fill the vacancy
222 // TODO: maybe rate-limit this call based on slot availability
223 await
this._getWork(dbCtx
);
227 this.logger
.error(_scope
, 'failed', { error: e
});
228 // Try again later anyhow.
231 // No more work, wait a while and retry
234 this.isProcessing
= false;
239 module
.exports
= Worker
;