*/
/**
- * @callback Worker~promiseGiver
+ * @callback PromiseGiver
+ * @param {*} dbCtx
* @param {number} atMost
* @returns {Promise<void>[]}
*/
class Worker {
/**
- * @param {object} logger
- * @param {Worker~promiseGiver} promiseGiver
- * @param {object} options
- * @param {object} options.worker
- * @param {object} options.worker.pollingEnabled
- * @param {number} options.worker.recurrSleepMs
- * @param {number} options.worker.concurrency
+ * @param {object} logger logger instance
+ * @param {object} db db instance
+ * @param {PromiseGiver} promiseGiver function which fetches and processes work
+ * @param {object} options options
+ * @param {object} options.worker worker options
+ * @param {object} options.worker.pollingEnabled whether to run worker at all
+ * @param {number} options.worker.recurrSleepMs time between processing runs
+ * @param {number} options.worker.concurrency how much work to be working on at once
*/
- constructor(logger, promiseGiver, options) {
+ constructor(logger, db, promiseGiver, options) {
this.logger = logger;
+ this.db = db;
this.options = options;
if (!promiseGiver || typeof promiseGiver !== 'function') {
throw new TypeError('function required');
this.recurrSleepMs = this.options.worker.recurrSleepMs;
this.inFlight = []; // Our work heap of Promises
this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
- this.running = false;
+ this.running = false; // Worker is running.
+ this.isProcessing = false; // Only let one process() method execute on the work heap at a time
}
/**
* Begin the scheduled loop.
+ * @param {number} stagger vary startup time by some fraction of recurrence
*/
start(stagger = 0.618) {
const _scope = _fileScope('start');
* results, and use the race as a sort of condvar for checking everything
* in the list of what we were waiting for.
* NB this means promise cannot be further chained, or it loses the magic.
- * @param {Promise} promise
+ * @param {Promise} promise promise to watch
* @returns {Promise} watchedPromise
*/
static watchedPromise(promise) {
- if (Object.prototype.hasOwnProperty.call(promise, 'isSettled')) {
+ if (Object.hasOwn(promise, 'isSettled')) {
return promise;
}
let isSettled = false;
- let resolved = undefined;
- let rejected = undefined;
+ let resolved;
+ let rejected;
promise = promise.then(
(res) => {
return promise;
}
+ /**
+ * @callback HandlerFunction
+ * @param {*} resolved
+ * @param {*} rejected
+ * @returns {void}
+ */
/**
* Process the list of promises, removing any which have settled,
* and passes their fulfilled values to the handler.
- *
- * @param {HandlerFunction} handler
- * @returns {number} handled
+ * @param {HandlerFunction} handler invoked on settled promises
+ * @returns {number} handled promises removed from inFlight list
*/
_handleWatchedList(handler) {
let handled = 0;
/**
* Refill the workpool with our special promises.
- * @returns {Promise[]}
+ * @param {*} dbCtx db context
+ * @returns {Promise<Promise[]>} wrapped promises
*/
- async _getWork() {
+ async _getWork(dbCtx) {
const _scope = _fileScope('_getWork');
let newPromises = [];
const wanted = this.concurrency - this.inFlight.length;
if (wanted > 0) {
- newPromises = await this.promiseGiver(wanted);
+ newPromises = await this.promiseGiver(dbCtx, wanted);
newPromises = newPromises.map((p) => Worker.watchedPromise(p));
common.stackSafePush(this.inFlight, newPromises);
}
/**
* Simply log results of promises, for now.
- * @param {*} resolved
- * @param {*} rejected
+ * @param {*} resolved promise resolution value
+ * @param {*} rejected promise rejection value
*/
_watchedHandler(resolved, rejected) {
const _scope = _fileScope('_watchedHandler');
- this.logger.debug(_scope, { resolved, rejected });
if (rejected) {
- this.logger.error(_scope, { rejected });
+ this.logger.error(_scope, 'rejected', { rejected });
+ } else {
+ this.logger.debug(_scope, 'resolved', { resolved });
}
}
async process() {
const _scope = _fileScope('process');
- this.logger.debug(_scope, 'called', {});
+ this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing });
+
+
+ if (this.isProcessing) {
+ return;
+ }
+ this.isProcessing = true;
// Interrupt any pending sleep, if we were called out of timeout-cycle.
clearTimeout(this.nextTimeout);
+ this.nextTimeout = undefined;
- // Try to fill the hopper
- await this._getWork();
-
- while (this.inFlight.length > 0) {
- /* Wait for one or more to be resolved.
- * We don't care what the result was, as we have to scan the list
- * for all settled promises anyhow, and our wrapper has stored the
- * results.
- */
- try {
- await Promise.race(this.inFlight);
- } catch (e) {
- // NOP here, as we'll handle it when we scan the list
- }
- this.logger.debug(_scope, { msg: 'race completed' });
-
- // Address settled promises..
- const settled = this._handleWatchedList(this._watchedHandler.bind(this));
- this.logger.debug(_scope, { settled });
-
- // Try to fill the vacancy
- // TODO: maybe rate-limit this call based on slot availability
- await this._getWork();
+ try {
+ await this.db.context(async (dbCtx) => {
+
+ // Try to fill the hopper
+ await this._getWork(dbCtx);
+
+ while (this.inFlight.length > 0) {
+ /* Wait for one or more to be resolved.
+ * We don't care what the result was, as we have to scan the list
+ * for all settled promises anyhow, and our wrapper has stored the
+ * results.
+ */
+ try {
+ await Promise.race(this.inFlight);
+ } catch (e) { // eslint-disable-line no-unused-vars
+ // NOP here, as we'll handle it when we scan the list
+ }
+
+ // Address settled promises..
+ this._handleWatchedList(this._watchedHandler.bind(this));
+
+ // Try to fill the vacancy
+ // TODO: maybe rate-limit this call based on slot availability
+ await this._getWork(dbCtx);
+ }
+ }); // dbCtx
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ // Try again later anyhow.
}
// No more work, wait a while and retry
this._recurr();
+
+ this.isProcessing = false;
}
}