X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=src%2Fworker.js;h=9b6fd7ab882752f00dc76ae693db8980e8eadc76;hb=9f9d3c8;hp=ca77369aa3646fa871547bc67f8bd5c49e65b6d4;hpb=5e7ab1d9ec27aab6e321c5a647cc1717980d331e;p=websub-hub diff --git a/src/worker.js b/src/worker.js index ca77369..9b6fd7a 100644 --- a/src/worker.js +++ b/src/worker.js @@ -12,22 +12,25 @@ const _fileScope = common.fileScope(__filename); /** * @callback Worker~promiseGiver + * @param {*} dbCtx * @param {number} atMost * @returns {Promise[]} */ class Worker { /** - * @param {object} logger - * @param {Worker~promiseGiver} promiseGiver + * @param {object} logger + * @param {object} db + * @param {Worker~promiseGiver} promiseGiver * @param {object} options * @param {object} options.worker * @param {object} options.worker.pollingEnabled * @param {number} options.worker.recurrSleepMs * @param {number} options.worker.concurrency */ - constructor(logger, promiseGiver, options) { + constructor(logger, db, promiseGiver, options) { this.logger = logger; + this.db = db; this.options = options; if (!promiseGiver || typeof promiseGiver !== 'function') { throw new TypeError('function required'); @@ -129,14 +132,15 @@ class Worker { /** * Refill the workpool with our special promises. + * @param {*} dbCtx * @returns {Promise[]} */ - async _getWork() { + async _getWork(dbCtx) { const _scope = _fileScope('_getWork'); let newPromises = []; const wanted = this.concurrency - this.inFlight.length; if (wanted > 0) { - newPromises = await this.promiseGiver(wanted); + newPromises = await this.promiseGiver(dbCtx, wanted); newPromises = newPromises.map((p) => Worker.watchedPromise(p)); common.stackSafePush(this.inFlight, newPromises); } @@ -178,29 +182,38 @@ class Worker { // Interrupt any pending sleep, if we were called out of timeout-cycle. clearTimeout(this.nextTimeout); - // Try to fill the hopper - await this._getWork(); - - while (this.inFlight.length > 0) { - /* Wait for one or more to be resolved. - * We don't care what the result was, as we have to scan the list - * for all settled promises anyhow, and our wrapper has stored the - * results. - */ - try { - await Promise.race(this.inFlight); - } catch (e) { - // NOP here, as we'll handle it when we scan the list - } - this.logger.debug(_scope, { msg: 'race completed' }); - - // Address settled promises.. - const settled = this._handleWatchedList(this._watchedHandler.bind(this)); - this.logger.debug(_scope, { settled }); - - // Try to fill the vacancy - // TODO: maybe rate-limit this call based on slot availability - await this._getWork(); + // Share one db connection for all tasks. + try { + await this.db.context(async (dbCtx) => { + + // Try to fill the hopper + await this._getWork(dbCtx); + + while (this.inFlight.length > 0) { + /* Wait for one or more to be resolved. + * We don't care what the result was, as we have to scan the list + * for all settled promises anyhow, and our wrapper has stored the + * results. + */ + try { + await Promise.race(this.inFlight); + } catch (e) { + // NOP here, as we'll handle it when we scan the list + } + this.logger.debug(_scope, { msg: 'race completed' }); + + // Address settled promises.. + const settled = this._handleWatchedList(this._watchedHandler.bind(this)); + this.logger.debug(_scope, { settled }); + + // Try to fill the vacancy + // TODO: maybe rate-limit this call based on slot availability + await this._getWork(dbCtx); + } + }); // dbCtx + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + // Try again later anyhow. } // No more work, wait a while and retry