X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=src%2Fworker.js;h=5c97c0b24dd33de7dabdceb02ed298320d971188;hb=HEAD;hp=f956ba1d3cb61935a74236644be4307e276f5c3d;hpb=9696c012e6b9a6c58904baa397ca0ebf78112316;p=websub-hub diff --git a/src/worker.js b/src/worker.js index f956ba1..4464af4 100644 --- a/src/worker.js +++ b/src/worker.js @@ -11,23 +11,26 @@ const _fileScope = common.fileScope(__filename); */ /** - * @callback Worker~promiseGiver + * @callback PromiseGiver + * @param {*} dbCtx * @param {number} atMost * @returns {Promise[]} */ class Worker { /** - * @param {object} logger - * @param {Worker~promiseGiver} promiseGiver - * @param {object} options - * @param {object} options.worker - * @param {object} options.worker.pollingEnabled - * @param {number} options.worker.recurrSleepMs - * @param {number} options.worker.concurrency + * @param {object} logger logger instance + * @param {object} db db instance + * @param {PromiseGiver} promiseGiver function which fetches and processes work + * @param {object} options options + * @param {object} options.worker worker options + * @param {object} options.worker.pollingEnabled whether to run worker at all + * @param {number} options.worker.recurrSleepMs time between processing runs + * @param {number} options.worker.concurrency how much work to be working on at once */ - constructor(logger, promiseGiver, options) { + constructor(logger, db, promiseGiver, options) { this.logger = logger; + this.db = db; this.options = options; if (!promiseGiver || typeof promiseGiver !== 'function') { throw new TypeError('function required'); @@ -38,11 +41,13 @@ class Worker { this.recurrSleepMs = this.options.worker.recurrSleepMs; this.inFlight = []; // Our work heap of Promises this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period. - this.running = false; + this.running = false; // Worker is running. + this.isProcessing = false; // Only let one process() method execute on the work heap at a time } /** * Begin the scheduled loop. + * @param {number} stagger vary startup time by some fraction of recurrence */ start(stagger = 0.618) { const _scope = _fileScope('start'); @@ -73,17 +78,17 @@ class Worker { * results, and use the race as a sort of condvar for checking everything * in the list of what we were waiting for. * NB this means promise cannot be further chained, or it loses the magic. - * @param {Promise} promise + * @param {Promise} promise promise to watch * @returns {Promise} watchedPromise */ static watchedPromise(promise) { - if (Object.prototype.hasOwnProperty.call(promise, 'isSettled')) { + if (Object.hasOwn(promise, 'isSettled')) { return promise; } let isSettled = false; - let resolved = undefined; - let rejected = undefined; + let resolved; + let rejected; promise = promise.then( (res) => { @@ -95,7 +100,7 @@ class Worker { isSettled = true; rejected = rej; throw rej; - }); + }); Object.defineProperties(promise, { isSettled: { get: () => isSettled }, @@ -106,12 +111,17 @@ class Worker { return promise; } + /** + * @callback HandlerFunction + * @param {*} resolved + * @param {*} rejected + * @returns {void} + */ /** * Process the list of promises, removing any which have settled, * and passes their fulfilled values to the handler. - * - * @param {HandlerFunction} handler - * @returns {number} handled + * @param {HandlerFunction} handler invoked on settled promises + * @returns {number} handled promises removed from inFlight list */ _handleWatchedList(handler) { let handled = 0; @@ -129,14 +139,15 @@ class Worker { /** * Refill the workpool with our special promises. - * @returns {Promise[]} + * @param {*} dbCtx db context + * @returns {Promise} wrapped promises */ - async _getWork() { + async _getWork(dbCtx) { const _scope = _fileScope('_getWork'); let newPromises = []; const wanted = this.concurrency - this.inFlight.length; if (wanted > 0) { - newPromises = await this.promiseGiver(wanted); + newPromises = await this.promiseGiver(dbCtx, wanted); newPromises = newPromises.map((p) => Worker.watchedPromise(p)); common.stackSafePush(this.inFlight, newPromises); } @@ -146,15 +157,16 @@ class Worker { /** * Simply log results of promises, for now. - * @param {*} resolved - * @param {*} rejected + * @param {*} resolved promise resolution value + * @param {*} rejected promise rejection value */ _watchedHandler(resolved, rejected) { const _scope = _fileScope('_watchedHandler'); - this.logger.debug(_scope, { resolved, rejected }); if (rejected) { - this.logger.error(_scope, { rejected }); + this.logger.error(_scope, 'rejected', { rejected }); + } else { + this.logger.debug(_scope, 'resolved', { resolved }); } } @@ -173,38 +185,53 @@ class Worker { async process() { const _scope = _fileScope('process'); - this.logger.debug(_scope, 'called', {}); + this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing }); + + + if (this.isProcessing) { + return; + } + this.isProcessing = true; // Interrupt any pending sleep, if we were called out of timeout-cycle. clearTimeout(this.nextTimeout); + this.nextTimeout = undefined; - // Try to fill the hopper - await this._getWork(); - - while (this.inFlight.length > 0) { - /* Wait for one or more to be resolved. - * We don't care what the result was, as we have to scan the list - * for all settled promises anyhow, and our wrapper has stored the - * results. - */ - try { - await Promise.race(this.inFlight); - } catch (e) { - // NOP here, as we'll handle it when we scan the list - } - this.logger.debug(_scope, { msg: 'race completed' }); - - // Address settled promises.. - const settled = this._handleWatchedList(this._watchedHandler.bind(this)); - this.logger.debug(_scope, { settled }); - - // Try to fill the vacancy - // TODO: maybe rate-limit this call based on slot availability - await this._getWork(); + try { + await this.db.context(async (dbCtx) => { + + // Try to fill the hopper + await this._getWork(dbCtx); + + while (this.inFlight.length > 0) { + /* Wait for one or more to be resolved. + * We don't care what the result was, as we have to scan the list + * for all settled promises anyhow, and our wrapper has stored the + * results. + */ + try { + await Promise.race(this.inFlight); + } catch (e) { // eslint-disable-line no-unused-vars + // NOP here, as we'll handle it when we scan the list + } + + // Address settled promises.. + this._handleWatchedList(this._watchedHandler.bind(this)); + + // Try to fill the vacancy + // TODO: maybe rate-limit this call based on slot availability + await this._getWork(dbCtx); + } + }); // dbCtx + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + // Try again later anyhow. } // No more work, wait a while and retry this._recurr(); + + this.isProcessing = false; } }