3 const common
= require('./common');
5 const _fileScope
= common
.fileScope(__filename
);
8 * Always try to do some things, but not too many.
9 * This is a generic polling promise-wrangler, keeping a set number
10 * of promises in flight, trying to replace them as they finish.
14 * @callback Worker~promiseGiver
16 * @param {number} atMost
17 * @returns {Promise<void>[]}
22 * @param {object} logger
24 * @param {Worker~promiseGiver} promiseGiver
25 * @param {object} options
26 * @param {object} options.worker
27 * @param {object} options.worker.pollingEnabled
28 * @param {number} options.worker.recurrSleepMs
29 * @param {number} options.worker.concurrency
31 constructor(logger
, db
, promiseGiver
, options
) {
34 this.options
= options
;
35 if (!promiseGiver
|| typeof promiseGiver
!== 'function') {
36 throw new TypeError('function required');
38 this.promiseGiver
= promiseGiver
;
40 this.concurrency
= this.options
.worker
.concurrency
;
41 this.recurrSleepMs
= this.options
.worker
.recurrSleepMs
;
42 this.inFlight
= []; // Our work heap of Promises
43 this.nextTimeout
= undefined; // Allow clearTimeout() to reset waiting period.
48 * Begin the scheduled loop.
50 start(stagger
= 0.618) {
51 const _scope
= _fileScope('start');
52 this.logger
.debug(_scope
, 'called', {});
53 if (this.options
.worker
.pollingEnabled
) {
55 // Try to keep clustered nodes from all processing at the same time.
56 const staggerMs
= Math
.floor(Math
.random() * this.recurrSleepMs
* stagger
);
57 this.nextTimeout
= setTimeout(this._recurr
.bind(this), staggerMs
);
62 * Cancel the scheduled loop.
65 const _scope
= _fileScope('stop');
66 this.logger
.debug(_scope
, 'called', {});
68 clearTimeout(this.nextTimeout
);
69 this.nextTimeout
= undefined;
73 * The problem: Promise.race doesn't report which promise(s) settled, and
74 * there is no native interface for querying promise state.
75 * So we will wrap all our pending-work promises with a flag and the
76 * results, and use the race as a sort of condvar for checking everything
77 * in the list of what we were waiting for.
78 * NB this means promise cannot be further chained, or it loses the magic.
79 * @param {Promise} promise
80 * @returns {Promise} watchedPromise
82 static watchedPromise(promise
) {
83 if (Object
.prototype.hasOwnProperty
.call(promise
, 'isSettled')) {
87 let isSettled
= false;
88 let resolved
= undefined;
89 let rejected
= undefined;
91 promise
= promise
.then(
103 Object
.defineProperties(promise
, {
104 isSettled: { get: () => isSettled
},
105 resolved: { get: () => resolved
},
106 rejected: { get: () => rejected
},
113 * Process the list of promises, removing any which have settled,
114 * and passes their fulfilled values to the handler.
116 * @param {HandlerFunction} handler
117 * @returns {number} handled
119 _handleWatchedList(handler
) {
121 for (let i
= this.inFlight
.length
- 1; i
>= 0; i
--) {
122 // eslint-disable-next-line security/detect-object-injection
123 const p
= this.inFlight
[i
];
125 handler(p
.resolved
, p
.rejected
);
126 this.inFlight
.splice(i
, 1);
134 * Refill the workpool with our special promises.
136 * @returns {Promise[]}
138 async
_getWork(dbCtx
) {
139 const _scope
= _fileScope('_getWork');
140 let newPromises
= [];
141 const wanted
= this.concurrency
- this.inFlight
.length
;
143 newPromises
= await
this.promiseGiver(dbCtx
, wanted
);
144 newPromises
= newPromises
.map((p
) => Worker
.watchedPromise(p
));
145 common
.stackSafePush(this.inFlight
, newPromises
);
147 this.logger
.debug(_scope
, 'completed', { wanted
, added: newPromises
.length
});
152 * Simply log results of promises, for now.
153 * @param {*} resolved
154 * @param {*} rejected
156 _watchedHandler(resolved
, rejected
) {
157 const _scope
= _fileScope('_watchedHandler');
159 this.logger
.debug(_scope
, { resolved
, rejected
});
161 this.logger
.error(_scope
, { rejected
});
166 * Schedule the next getWork.
169 if (this.running
&& this.recurrSleepMs
) {
170 this.nextTimeout
= setTimeout(this.process
.bind(this), this.recurrSleepMs
);
175 * Attempt to do as much work as we can.
178 const _scope
= _fileScope('process');
180 this.logger
.debug(_scope
, 'called', {});
182 // Interrupt any pending sleep, if we were called out of timeout-cycle.
183 clearTimeout(this.nextTimeout
);
185 // Share one db connection for all tasks.
187 await
this.db
.context(async (dbCtx
) => {
189 // Try to fill the hopper
190 await
this._getWork(dbCtx
);
192 while (this.inFlight
.length
> 0) {
193 /* Wait for one or more to be resolved.
194 * We don't care what the result was, as we have to scan the list
195 * for all settled promises anyhow, and our wrapper has stored the
199 await Promise
.race(this.inFlight
);
201 // NOP here, as we'll handle it when we scan the list
203 this.logger
.debug(_scope
, { msg: 'race completed' });
205 // Address settled promises..
206 const settled
= this._handleWatchedList(this._watchedHandler
.bind(this));
207 this.logger
.debug(_scope
, { settled
});
209 // Try to fill the vacancy
210 // TODO: maybe rate-limit this call based on slot availability
211 await
this._getWork(dbCtx
);
215 this.logger
.error(_scope
, 'failed', { error: e
});
216 // Try again later anyhow.
219 // No more work, wait a while and retry
225 module
.exports
= Worker
;