X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=src%2Fworker.js;h=ebf891405124e730881b96c1b69d4bf4593be689;hb=HEAD;hp=ca77369aa3646fa871547bc67f8bd5c49e65b6d4;hpb=38aba0869dc3ade99d439e74cbc6239b4fa1f632;p=websub-hub diff --git a/src/worker.js b/src/worker.js index ca77369..ebf8914 100644 --- a/src/worker.js +++ b/src/worker.js @@ -12,22 +12,25 @@ const _fileScope = common.fileScope(__filename); /** * @callback Worker~promiseGiver + * @param {*} dbCtx * @param {number} atMost * @returns {Promise[]} */ class Worker { /** - * @param {object} logger - * @param {Worker~promiseGiver} promiseGiver + * @param {object} logger + * @param {object} db + * @param {Worker~promiseGiver} promiseGiver * @param {object} options * @param {object} options.worker * @param {object} options.worker.pollingEnabled * @param {number} options.worker.recurrSleepMs * @param {number} options.worker.concurrency */ - constructor(logger, promiseGiver, options) { + constructor(logger, db, promiseGiver, options) { this.logger = logger; + this.db = db; this.options = options; if (!promiseGiver || typeof promiseGiver !== 'function') { throw new TypeError('function required'); @@ -39,6 +42,7 @@ class Worker { this.inFlight = []; // Our work heap of Promises this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period. this.running = false; + this.isProcessing = false; // Only let one process() method execute on the work heap at a time } /** @@ -129,14 +133,15 @@ class Worker { /** * Refill the workpool with our special promises. + * @param {*} dbCtx * @returns {Promise[]} */ - async _getWork() { + async _getWork(dbCtx) { const _scope = _fileScope('_getWork'); let newPromises = []; const wanted = this.concurrency - this.inFlight.length; if (wanted > 0) { - newPromises = await this.promiseGiver(wanted); + newPromises = await this.promiseGiver(dbCtx, wanted); newPromises = newPromises.map((p) => Worker.watchedPromise(p)); common.stackSafePush(this.inFlight, newPromises); } @@ -173,38 +178,54 @@ class Worker { async process() { const _scope = _fileScope('process'); - this.logger.debug(_scope, 'called', {}); + this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing }); + + + if (this.isProcessing) { + return; + } + this.isProcessing = true; // Interrupt any pending sleep, if we were called out of timeout-cycle. clearTimeout(this.nextTimeout); - // Try to fill the hopper - await this._getWork(); - - while (this.inFlight.length > 0) { - /* Wait for one or more to be resolved. - * We don't care what the result was, as we have to scan the list - * for all settled promises anyhow, and our wrapper has stored the - * results. - */ - try { - await Promise.race(this.inFlight); - } catch (e) { - // NOP here, as we'll handle it when we scan the list - } - this.logger.debug(_scope, { msg: 'race completed' }); - - // Address settled promises.. - const settled = this._handleWatchedList(this._watchedHandler.bind(this)); - this.logger.debug(_scope, { settled }); - - // Try to fill the vacancy - // TODO: maybe rate-limit this call based on slot availability - await this._getWork(); + try { + await this.db.context(async (dbCtx) => { + + // Try to fill the hopper + await this._getWork(dbCtx); + + while (this.inFlight.length > 0) { + /* Wait for one or more to be resolved. + * We don't care what the result was, as we have to scan the list + * for all settled promises anyhow, and our wrapper has stored the + * results. + */ + try { + await Promise.race(this.inFlight); + } catch (e) { + // NOP here, as we'll handle it when we scan the list + } + this.logger.debug(_scope, { msg: 'race completed' }); + + // Address settled promises.. + const settled = this._handleWatchedList(this._watchedHandler.bind(this)); + this.logger.debug(_scope, { settled }); + + // Try to fill the vacancy + // TODO: maybe rate-limit this call based on slot availability + await this._getWork(dbCtx); + } + }); // dbCtx + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + // Try again later anyhow. } // No more work, wait a while and retry this._recurr(); + + this.isProcessing = false; } }