3 const common
= require('./common');
5 const _fileScope
= common
.fileScope(__filename
);
8 * Always try to do some things, but not too many.
9 * This is a generic polling promise-wrangler, keeping a set number
10 * of promises in flight, trying to replace them as they finish.
14 * @callback Worker~promiseGiver
15 * @param {number} atMost
16 * @returns {Promise<void>[]}
21 * @param {object} logger
22 * @param {Worker~promiseGiver} promiseGiver
23 * @param {object} options
24 * @param {object} options.worker
25 * @param {object} options.worker.pollingEnabled
26 * @param {number} options.worker.recurrSleepMs
27 * @param {number} options.worker.concurrency
29 constructor(logger
, promiseGiver
, options
) {
31 this.options
= options
;
32 if (!promiseGiver
|| typeof promiseGiver
!== 'function') {
33 throw new TypeError('function required');
35 this.promiseGiver
= promiseGiver
;
37 this.concurrency
= this.options
.worker
.concurrency
;
38 this.recurrSleepMs
= this.options
.worker
.recurrSleepMs
;
39 this.inFlight
= []; // Our work heap of Promises
40 this.nextTimeout
= undefined; // Allow clearTimeout() to reset waiting period.
45 * Begin the scheduled loop.
47 start(stagger
= 0.618) {
48 const _scope
= _fileScope('start');
49 this.logger
.debug(_scope
, 'called', {});
50 if (this.options
.worker
.pollingEnabled
) {
52 // Try to keep clustered nodes from all processing at the same time.
53 const staggerMs
= Math
.floor(Math
.random() * this.recurrSleepMs
* stagger
);
54 this.nextTimeout
= setTimeout(this._recurr
.bind(this), staggerMs
);
59 * Cancel the scheduled loop.
62 const _scope
= _fileScope('stop');
63 this.logger
.debug(_scope
, 'called', {});
65 clearTimeout(this.nextTimeout
);
66 this.nextTimeout
= undefined;
70 * The problem: Promise.race doesn't report which promise(s) settled, and
71 * there is no native interface for querying promise state.
72 * So we will wrap all our pending-work promises with a flag and the
73 * results, and use the race as a sort of condvar for checking everything
74 * in the list of what we were waiting for.
75 * NB this means promise cannot be further chained, or it loses the magic.
76 * @param {Promise} promise
77 * @returns {Promise} watchedPromise
79 static watchedPromise(promise
) {
80 if (Object
.prototype.hasOwnProperty
.call(promise
, 'isSettled')) {
84 let isSettled
= false;
85 let resolved
= undefined;
86 let rejected
= undefined;
88 promise
= promise
.then(
100 Object
.defineProperties(promise
, {
101 isSettled: { get: () => isSettled
},
102 resolved: { get: () => resolved
},
103 rejected: { get: () => rejected
},
110 * Process the list of promises, removing any which have settled,
111 * and passes their fulfilled values to the handler.
113 * @param {HandlerFunction} handler
114 * @returns {number} handled
116 _handleWatchedList(handler
) {
118 for (let i
= this.inFlight
.length
- 1; i
>= 0; i
--) {
119 // eslint-disable-next-line security/detect-object-injection
120 const p
= this.inFlight
[i
];
122 handler(p
.resolved
, p
.rejected
);
123 this.inFlight
.splice(i
, 1);
131 * Refill the workpool with our special promises.
132 * @returns {Promise[]}
135 const _scope
= _fileScope('_getWork');
136 let newPromises
= [];
137 const wanted
= this.concurrency
- this.inFlight
.length
;
139 newPromises
= await
this.promiseGiver(wanted
);
140 newPromises
= newPromises
.map((p
) => Worker
.watchedPromise(p
));
141 common
.stackSafePush(this.inFlight
, newPromises
);
143 this.logger
.debug(_scope
, 'completed', { wanted
, added: newPromises
.length
});
148 * Simply log results of promises, for now.
149 * @param {*} resolved
150 * @param {*} rejected
152 _watchedHandler(resolved
, rejected
) {
153 const _scope
= _fileScope('_watchedHandler');
155 this.logger
.debug(_scope
, { resolved
, rejected
});
157 this.logger
.error(_scope
, { rejected
});
162 * Schedule the next getWork.
165 if (this.running
&& this.recurrSleepMs
) {
166 this.nextTimeout
= setTimeout(this.process
.bind(this), this.recurrSleepMs
);
171 * Attempt to do as much work as we can.
174 const _scope
= _fileScope('process');
176 this.logger
.debug(_scope
, 'called', {});
178 // Interrupt any pending sleep, if we were called out of timeout-cycle.
179 clearTimeout(this.nextTimeout
);
181 // Try to fill the hopper
182 await
this._getWork();
184 while (this.inFlight
.length
> 0) {
185 /* Wait for one or more to be resolved.
186 * We don't care what the result was, as we have to scan the list
187 * for all settled promises anyhow, and our wrapper has stored the
191 await Promise
.race(this.inFlight
);
193 // NOP here, as we'll handle it when we scan the list
195 this.logger
.debug(_scope
, { msg: 'race completed' });
197 // Address settled promises..
198 const settled
= this._handleWatchedList(this._watchedHandler
.bind(this));
199 this.logger
.debug(_scope
, { settled
});
201 // Try to fill the vacancy
202 // TODO: maybe rate-limit this call based on slot availability
203 await
this._getWork();
206 // No more work, wait a while and retry
212 module
.exports
= Worker
;