*/
/**
- * @callback Worker~promiseGiver
+ * @callback PromiseGiver
* @param {*} dbCtx
* @param {number} atMost
* @returns {Promise<void>[]}
class Worker {
/**
- * @param {object} logger
- * @param {object} db
- * @param {Worker~promiseGiver} promiseGiver
- * @param {object} options
- * @param {object} options.worker
- * @param {object} options.worker.pollingEnabled
- * @param {number} options.worker.recurrSleepMs
- * @param {number} options.worker.concurrency
+ * @param {object} logger logger instance
+ * @param {object} db db instance
+ * @param {PromiseGiver} promiseGiver function which fetches and processes work
+ * @param {object} options options
+ * @param {object} options.worker worker options
+ * @param {object} options.worker.pollingEnabled whether to run worker at all
+ * @param {number} options.worker.recurrSleepMs time between processing runs
+ * @param {number} options.worker.concurrency how much work to be working on at once
*/
constructor(logger, db, promiseGiver, options) {
this.logger = logger;
this.recurrSleepMs = this.options.worker.recurrSleepMs;
this.inFlight = []; // Our work heap of Promises
this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
- this.running = false;
+ this.running = false; // Worker is running.
this.isProcessing = false; // Only let one process() method execute on the work heap at a time
}
/**
* Begin the scheduled loop.
+ * @param {number} stagger vary startup time by some fraction of recurrence
*/
start(stagger = 0.618) {
const _scope = _fileScope('start');
* results, and use the race as a sort of condvar for checking everything
* in the list of what we were waiting for.
* NB this means promise cannot be further chained, or it loses the magic.
- * @param {Promise} promise
+ * @param {Promise} promise promise to watch
* @returns {Promise} watchedPromise
*/
static watchedPromise(promise) {
- if (Object.prototype.hasOwnProperty.call(promise, 'isSettled')) {
+ if (Object.hasOwn(promise, 'isSettled')) {
return promise;
}
let isSettled = false;
- let resolved = undefined;
- let rejected = undefined;
+ let resolved;
+ let rejected;
promise = promise.then(
(res) => {
return promise;
}
+ /**
+ * @callback HandlerFunction
+ * @param {*} resolved
+ * @param {*} rejected
+ * @returns {void}
+ */
/**
* Process the list of promises, removing any which have settled,
* and passes their fulfilled values to the handler.
- *
- * @param {HandlerFunction} handler
- * @returns {number} handled
+ * @param {HandlerFunction} handler invoked on settled promises
+ * @returns {number} handled promises removed from inFlight list
*/
_handleWatchedList(handler) {
let handled = 0;
/**
* Refill the workpool with our special promises.
- * @param {*} dbCtx
- * @returns {Promise[]}
+ * @param {*} dbCtx db context
+ * @returns {Promise<Promise[]>} wrapped promises
*/
async _getWork(dbCtx) {
const _scope = _fileScope('_getWork');
/**
* Simply log results of promises, for now.
- * @param {*} resolved
- * @param {*} rejected
+ * @param {*} resolved promise resolution value
+ * @param {*} rejected promise rejection value
*/
_watchedHandler(resolved, rejected) {
const _scope = _fileScope('_watchedHandler');
- this.logger.debug(_scope, { resolved, rejected });
if (rejected) {
- this.logger.error(_scope, { rejected });
+ this.logger.error(_scope, 'rejected', { rejected });
+ } else {
+ this.logger.debug(_scope, 'resolved', { resolved });
}
}
// Interrupt any pending sleep, if we were called out of timeout-cycle.
clearTimeout(this.nextTimeout);
+ this.nextTimeout = undefined;
- // Share one db connection for all tasks.
try {
await this.db.context(async (dbCtx) => {
*/
try {
await Promise.race(this.inFlight);
- } catch (e) {
+ } catch (e) { // eslint-disable-line no-unused-vars
// NOP here, as we'll handle it when we scan the list
}
- this.logger.debug(_scope, { msg: 'race completed' });
// Address settled promises..
- const settled = this._handleWatchedList(this._watchedHandler.bind(this));
- this.logger.debug(_scope, { settled });
+ this._handleWatchedList(this._watchedHandler.bind(this));
// Try to fill the vacancy
// TODO: maybe rate-limit this call based on slot availability