X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=src%2Fworker.js;h=5c97c0b24dd33de7dabdceb02ed298320d971188;hb=HEAD;hp=4f566ad9f97209bcc538692cf87b6e412ccbe877;hpb=ed6dc5a66ce0eaf2dd61f9fb7a5ec048944c68ee;p=websub-hub diff --git a/src/worker.js b/src/worker.js index 4f566ad..4464af4 100644 --- a/src/worker.js +++ b/src/worker.js @@ -11,7 +11,7 @@ const _fileScope = common.fileScope(__filename); */ /** - * @callback Worker~promiseGiver + * @callback PromiseGiver * @param {*} dbCtx * @param {number} atMost * @returns {Promise[]} @@ -19,14 +19,14 @@ const _fileScope = common.fileScope(__filename); class Worker { /** - * @param {object} logger - * @param {object} db - * @param {Worker~promiseGiver} promiseGiver - * @param {object} options - * @param {object} options.worker - * @param {object} options.worker.pollingEnabled - * @param {number} options.worker.recurrSleepMs - * @param {number} options.worker.concurrency + * @param {object} logger logger instance + * @param {object} db db instance + * @param {PromiseGiver} promiseGiver function which fetches and processes work + * @param {object} options options + * @param {object} options.worker worker options + * @param {object} options.worker.pollingEnabled whether to run worker at all + * @param {number} options.worker.recurrSleepMs time between processing runs + * @param {number} options.worker.concurrency how much work to be working on at once */ constructor(logger, db, promiseGiver, options) { this.logger = logger; @@ -41,12 +41,13 @@ class Worker { this.recurrSleepMs = this.options.worker.recurrSleepMs; this.inFlight = []; // Our work heap of Promises this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period. - this.running = false; + this.running = false; // Worker is running. this.isProcessing = false; // Only let one process() method execute on the work heap at a time } /** * Begin the scheduled loop. + * @param {number} stagger vary startup time by some fraction of recurrence */ start(stagger = 0.618) { const _scope = _fileScope('start'); @@ -77,17 +78,17 @@ class Worker { * results, and use the race as a sort of condvar for checking everything * in the list of what we were waiting for. * NB this means promise cannot be further chained, or it loses the magic. - * @param {Promise} promise + * @param {Promise} promise promise to watch * @returns {Promise} watchedPromise */ static watchedPromise(promise) { - if (Object.prototype.hasOwnProperty.call(promise, 'isSettled')) { + if (Object.hasOwn(promise, 'isSettled')) { return promise; } let isSettled = false; - let resolved = undefined; - let rejected = undefined; + let resolved; + let rejected; promise = promise.then( (res) => { @@ -110,12 +111,17 @@ class Worker { return promise; } + /** + * @callback HandlerFunction + * @param {*} resolved + * @param {*} rejected + * @returns {void} + */ /** * Process the list of promises, removing any which have settled, * and passes their fulfilled values to the handler. - * - * @param {HandlerFunction} handler - * @returns {number} handled + * @param {HandlerFunction} handler invoked on settled promises + * @returns {number} handled promises removed from inFlight list */ _handleWatchedList(handler) { let handled = 0; @@ -133,8 +139,8 @@ class Worker { /** * Refill the workpool with our special promises. - * @param {*} dbCtx - * @returns {Promise[]} + * @param {*} dbCtx db context + * @returns {Promise} wrapped promises */ async _getWork(dbCtx) { const _scope = _fileScope('_getWork'); @@ -151,15 +157,16 @@ class Worker { /** * Simply log results of promises, for now. - * @param {*} resolved - * @param {*} rejected + * @param {*} resolved promise resolution value + * @param {*} rejected promise rejection value */ _watchedHandler(resolved, rejected) { const _scope = _fileScope('_watchedHandler'); - this.logger.debug(_scope, { resolved, rejected }); if (rejected) { - this.logger.error(_scope, { rejected }); + this.logger.error(_scope, 'rejected', { rejected }); + } else { + this.logger.debug(_scope, 'resolved', { resolved }); } } @@ -188,8 +195,8 @@ class Worker { // Interrupt any pending sleep, if we were called out of timeout-cycle. clearTimeout(this.nextTimeout); + this.nextTimeout = undefined; - // Share one db connection for all tasks. try { await this.db.context(async (dbCtx) => { @@ -204,14 +211,12 @@ class Worker { */ try { await Promise.race(this.inFlight); - } catch (e) { + } catch (e) { // eslint-disable-line no-unused-vars // NOP here, as we'll handle it when we scan the list } - this.logger.debug(_scope, { msg: 'race completed' }); // Address settled promises.. - const settled = this._handleWatchedList(this._watchedHandler.bind(this)); - this.logger.debug(_scope, { settled }); + this._handleWatchedList(this._watchedHandler.bind(this)); // Try to fill the vacancy // TODO: maybe rate-limit this call based on slot availability