3 const common
= require('./common');
5 const _fileScope
= common
.fileScope(__filename
);
8 * Always try to do some things, but not too many.
9 * This is a generic polling promise-wrangler, keeping a set number
10 * of promises in flight, trying to replace them as they finish.
14 * @callback Worker~promiseGiver
16 * @param {number} atMost
17 * @returns {Promise<void>[]}
22 * @param {object} logger
24 * @param {Worker~promiseGiver} promiseGiver
25 * @param {object} options
26 * @param {object} options.worker
27 * @param {object} options.worker.pollingEnabled
28 * @param {number} options.worker.recurrSleepMs
29 * @param {number} options.worker.concurrency
31 constructor(logger
, db
, promiseGiver
, options
) {
34 this.options
= options
;
35 if (!promiseGiver
|| typeof promiseGiver
!== 'function') {
36 throw new TypeError('function required');
38 this.promiseGiver
= promiseGiver
;
40 this.concurrency
= this.options
.worker
.concurrency
;
41 this.recurrSleepMs
= this.options
.worker
.recurrSleepMs
;
42 this.inFlight
= []; // Our work heap of Promises
43 this.nextTimeout
= undefined; // Allow clearTimeout() to reset waiting period.
45 this.isProcessing
= false; // Only let one process() method execute on the work heap at a time
49 * Begin the scheduled loop.
51 start(stagger
= 0.618) {
52 const _scope
= _fileScope('start');
53 this.logger
.debug(_scope
, 'called', {});
54 if (this.options
.worker
.pollingEnabled
) {
56 // Try to keep clustered nodes from all processing at the same time.
57 const staggerMs
= Math
.floor(Math
.random() * this.recurrSleepMs
* stagger
);
58 this.nextTimeout
= setTimeout(this._recurr
.bind(this), staggerMs
);
63 * Cancel the scheduled loop.
66 const _scope
= _fileScope('stop');
67 this.logger
.debug(_scope
, 'called', {});
69 clearTimeout(this.nextTimeout
);
70 this.nextTimeout
= undefined;
74 * The problem: Promise.race doesn't report which promise(s) settled, and
75 * there is no native interface for querying promise state.
76 * So we will wrap all our pending-work promises with a flag and the
77 * results, and use the race as a sort of condvar for checking everything
78 * in the list of what we were waiting for.
79 * NB this means promise cannot be further chained, or it loses the magic.
80 * @param {Promise} promise
81 * @returns {Promise} watchedPromise
83 static watchedPromise(promise
) {
84 if (Object
.prototype.hasOwnProperty
.call(promise
, 'isSettled')) {
88 let isSettled
= false;
89 let resolved
= undefined;
90 let rejected
= undefined;
92 promise
= promise
.then(
104 Object
.defineProperties(promise
, {
105 isSettled: { get: () => isSettled
},
106 resolved: { get: () => resolved
},
107 rejected: { get: () => rejected
},
114 * Process the list of promises, removing any which have settled,
115 * and passes their fulfilled values to the handler.
117 * @param {HandlerFunction} handler
118 * @returns {number} handled
120 _handleWatchedList(handler
) {
122 for (let i
= this.inFlight
.length
- 1; i
>= 0; i
--) {
123 // eslint-disable-next-line security/detect-object-injection
124 const p
= this.inFlight
[i
];
126 handler(p
.resolved
, p
.rejected
);
127 this.inFlight
.splice(i
, 1);
135 * Refill the workpool with our special promises.
137 * @returns {Promise[]}
139 async
_getWork(dbCtx
) {
140 const _scope
= _fileScope('_getWork');
141 let newPromises
= [];
142 const wanted
= this.concurrency
- this.inFlight
.length
;
144 newPromises
= await
this.promiseGiver(dbCtx
, wanted
);
145 newPromises
= newPromises
.map((p
) => Worker
.watchedPromise(p
));
146 common
.stackSafePush(this.inFlight
, newPromises
);
148 this.logger
.debug(_scope
, 'completed', { wanted
, added: newPromises
.length
});
153 * Simply log results of promises, for now.
154 * @param {*} resolved
155 * @param {*} rejected
157 _watchedHandler(resolved
, rejected
) {
158 const _scope
= _fileScope('_watchedHandler');
160 this.logger
.debug(_scope
, { resolved
, rejected
});
162 this.logger
.error(_scope
, { rejected
});
167 * Schedule the next getWork.
170 if (this.running
&& this.recurrSleepMs
) {
171 this.nextTimeout
= setTimeout(this.process
.bind(this), this.recurrSleepMs
);
176 * Attempt to do as much work as we can.
179 const _scope
= _fileScope('process');
181 this.logger
.debug(_scope
, 'called', { isProcessing: this.isProcessing
});
184 if (this.isProcessing
) {
187 this.isProcessing
= true;
189 // Interrupt any pending sleep, if we were called out of timeout-cycle.
190 clearTimeout(this.nextTimeout
);
193 await
this.db
.context(async (dbCtx
) => {
195 // Try to fill the hopper
196 await
this._getWork(dbCtx
);
198 while (this.inFlight
.length
> 0) {
199 /* Wait for one or more to be resolved.
200 * We don't care what the result was, as we have to scan the list
201 * for all settled promises anyhow, and our wrapper has stored the
205 await Promise
.race(this.inFlight
);
207 // NOP here, as we'll handle it when we scan the list
209 this.logger
.debug(_scope
, { msg: 'race completed' });
211 // Address settled promises..
212 const settled
= this._handleWatchedList(this._watchedHandler
.bind(this));
213 this.logger
.debug(_scope
, { settled
});
215 // Try to fill the vacancy
216 // TODO: maybe rate-limit this call based on slot availability
217 await
this._getWork(dbCtx
);
221 this.logger
.error(_scope
, 'failed', { error: e
});
222 // Try again later anyhow.
225 // No more work, wait a while and retry
228 this.isProcessing
= false;
233 module
.exports
= Worker
;