/**
* @callback Worker~promiseGiver
+ * @param {*} dbCtx
* @param {number} atMost
* @returns {Promise<void>[]}
*/
class Worker {
/**
- * @param {object} logger
- * @param {Worker~promiseGiver} promiseGiver
+ * @param {object} logger
+ * @param {object} db
+ * @param {Worker~promiseGiver} promiseGiver
* @param {object} options
* @param {object} options.worker
* @param {object} options.worker.pollingEnabled
* @param {number} options.worker.recurrSleepMs
* @param {number} options.worker.concurrency
*/
- constructor(logger, promiseGiver, options) {
+ constructor(logger, db, promiseGiver, options) {
this.logger = logger;
+ this.db = db;
this.options = options;
if (!promiseGiver || typeof promiseGiver !== 'function') {
throw new TypeError('function required');
this.inFlight = []; // Our work heap of Promises
this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
this.running = false;
+ this.isProcessing = false; // Only let one process() method execute on the work heap at a time
}
/**
/**
* Refill the workpool with our special promises.
+ * @param {*} dbCtx
* @returns {Promise[]}
*/
- async _getWork() {
+ async _getWork(dbCtx) {
const _scope = _fileScope('_getWork');
let newPromises = [];
const wanted = this.concurrency - this.inFlight.length;
if (wanted > 0) {
- newPromises = await this.promiseGiver(wanted);
+ newPromises = await this.promiseGiver(dbCtx, wanted);
newPromises = newPromises.map((p) => Worker.watchedPromise(p));
common.stackSafePush(this.inFlight, newPromises);
}
async process() {
const _scope = _fileScope('process');
- this.logger.debug(_scope, 'called', {});
+ this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing });
+
+
+ if (this.isProcessing) {
+ return;
+ }
+ this.isProcessing = true;
// Interrupt any pending sleep, if we were called out of timeout-cycle.
clearTimeout(this.nextTimeout);
- // Try to fill the hopper
- await this._getWork();
-
- while (this.inFlight.length > 0) {
- /* Wait for one or more to be resolved.
- * We don't care what the result was, as we have to scan the list
- * for all settled promises anyhow, and our wrapper has stored the
- * results.
- */
- try {
- await Promise.race(this.inFlight);
- } catch (e) {
- // NOP here, as we'll handle it when we scan the list
- }
- this.logger.debug(_scope, { msg: 'race completed' });
-
- // Address settled promises..
- const settled = this._handleWatchedList(this._watchedHandler.bind(this));
- this.logger.debug(_scope, { settled });
-
- // Try to fill the vacancy
- // TODO: maybe rate-limit this call based on slot availability
- await this._getWork();
+ try {
+ await this.db.context(async (dbCtx) => {
+
+ // Try to fill the hopper
+ await this._getWork(dbCtx);
+
+ while (this.inFlight.length > 0) {
+ /* Wait for one or more to be resolved.
+ * We don't care what the result was, as we have to scan the list
+ * for all settled promises anyhow, and our wrapper has stored the
+ * results.
+ */
+ try {
+ await Promise.race(this.inFlight);
+ } catch (e) {
+ // NOP here, as we'll handle it when we scan the list
+ }
+ this.logger.debug(_scope, { msg: 'race completed' });
+
+ // Address settled promises..
+ const settled = this._handleWatchedList(this._watchedHandler.bind(this));
+ this.logger.debug(_scope, { settled });
+
+ // Try to fill the vacancy
+ // TODO: maybe rate-limit this call based on slot availability
+ await this._getWork(dbCtx);
+ }
+ }); // dbCtx
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ // Try again later anyhow.
}
// No more work, wait a while and retry
this._recurr();
+
+ this.isProcessing = false;
}
}