Merge branch 'v1.3-dev' as v1.3.11
[websub-hub] / src / worker.js
index ca77369aa3646fa871547bc67f8bd5c49e65b6d4..ebf891405124e730881b96c1b69d4bf4593be689 100644 (file)
@@ -12,22 +12,25 @@ const _fileScope = common.fileScope(__filename);
 
 /**
  * @callback Worker~promiseGiver
+ * @param {*} dbCtx
  * @param {number} atMost
  * @returns {Promise<void>[]}
  */
 
 class Worker {
   /**
-   * @param {object} logger 
-   * @param {Worker~promiseGiver} promiseGiver 
+   * @param {object} logger
+   * @param {object} db
+   * @param {Worker~promiseGiver} promiseGiver
    * @param {object} options
    * @param {object} options.worker
    * @param {object} options.worker.pollingEnabled
    * @param {number} options.worker.recurrSleepMs
    * @param {number} options.worker.concurrency
    */
-  constructor(logger, promiseGiver, options) {
+  constructor(logger, db, promiseGiver, options) {
     this.logger = logger;
+    this.db = db;
     this.options = options;
     if (!promiseGiver || typeof promiseGiver !== 'function') {
       throw new TypeError('function required');
@@ -39,6 +42,7 @@ class Worker {
     this.inFlight = []; // Our work heap of Promises  
     this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
     this.running = false;
+    this.isProcessing = false; // Only let one process() method execute on the work heap at a time
   }
 
   /**
@@ -129,14 +133,15 @@ class Worker {
 
   /**
    * Refill the workpool with our special promises.
+   * @param {*} dbCtx
    * @returns {Promise[]}
    */
-  async _getWork() {
+  async _getWork(dbCtx) {
     const _scope = _fileScope('_getWork');
     let newPromises = [];
     const wanted = this.concurrency - this.inFlight.length;
     if (wanted > 0) {
-      newPromises = await this.promiseGiver(wanted);
+      newPromises = await this.promiseGiver(dbCtx, wanted);
       newPromises = newPromises.map((p) => Worker.watchedPromise(p));
       common.stackSafePush(this.inFlight, newPromises);
     }
@@ -173,38 +178,54 @@ class Worker {
   async process() {
     const _scope = _fileScope('process');
 
-    this.logger.debug(_scope, 'called', {});
+    this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing });
+
+
+    if (this.isProcessing) {
+      return;
+    }
+    this.isProcessing = true;
 
     // Interrupt any pending sleep, if we were called out of timeout-cycle.
     clearTimeout(this.nextTimeout);
 
-    // Try to fill the hopper
-    await this._getWork();
-
-    while (this.inFlight.length > 0) {
-      /* Wait for one or more to be resolved.
-       * We don't care what the result was, as we have to scan the list
-       * for all settled promises anyhow, and our wrapper has stored the
-       * results.
-       */
-      try {
-        await Promise.race(this.inFlight);
-      } catch (e) {
-        // NOP here, as we'll handle it when we scan the list
-      }
-      this.logger.debug(_scope, { msg: 'race completed' });
-
-      // Address settled promises..
-      const settled = this._handleWatchedList(this._watchedHandler.bind(this));
-      this.logger.debug(_scope, { settled });
-      
-      // Try to fill the vacancy
-      // TODO: maybe rate-limit this call based on slot availability
-      await this._getWork();
+    try {
+      await this.db.context(async (dbCtx) => {
+
+        // Try to fill the hopper
+        await this._getWork(dbCtx);
+
+        while (this.inFlight.length > 0) {
+          /* Wait for one or more to be resolved.
+          * We don't care what the result was, as we have to scan the list
+          * for all settled promises anyhow, and our wrapper has stored the
+          * results.
+          */
+          try {
+            await Promise.race(this.inFlight);
+          } catch (e) {
+            // NOP here, as we'll handle it when we scan the list
+          }
+          this.logger.debug(_scope, { msg: 'race completed' });
+
+          // Address settled promises..
+          const settled = this._handleWatchedList(this._watchedHandler.bind(this));
+          this.logger.debug(_scope, { settled });
+
+          // Try to fill the vacancy
+          // TODO: maybe rate-limit this call based on slot availability
+          await this._getWork(dbCtx);
+        }
+      }); // dbCtx
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      // Try again later anyhow.
     }
 
     // No more work, wait a while and retry
     this._recurr();
+
+    this.isProcessing = false;
   }
 
 }