Merge branch 'v1.3-dev'
[websub-hub] / src / worker.js
index ca77369aa3646fa871547bc67f8bd5c49e65b6d4..4464af4ecba50ae95a67509c6ed127c230d699b2 100644 (file)
@@ -11,23 +11,26 @@ const _fileScope = common.fileScope(__filename);
  */
 
 /**
- * @callback Worker~promiseGiver
+ * @callback PromiseGiver
+ * @param {*} dbCtx
  * @param {number} atMost
  * @returns {Promise<void>[]}
  */
 
 class Worker {
   /**
-   * @param {object} logger 
-   * @param {Worker~promiseGiver} promiseGiver 
-   * @param {object} options
-   * @param {object} options.worker
-   * @param {object} options.worker.pollingEnabled
-   * @param {number} options.worker.recurrSleepMs
-   * @param {number} options.worker.concurrency
+   * @param {object} logger logger instance
+   * @param {object} db db instance
+   * @param {PromiseGiver} promiseGiver function which fetches and processes work
+   * @param {object} options options
+   * @param {object} options.worker worker options
+   * @param {object} options.worker.pollingEnabled whether to run worker at all
+   * @param {number} options.worker.recurrSleepMs time between processing runs
+   * @param {number} options.worker.concurrency how much work to be working on at once
    */
-  constructor(logger, promiseGiver, options) {
+  constructor(logger, db, promiseGiver, options) {
     this.logger = logger;
+    this.db = db;
     this.options = options;
     if (!promiseGiver || typeof promiseGiver !== 'function') {
       throw new TypeError('function required');
@@ -38,11 +41,13 @@ class Worker {
     this.recurrSleepMs = this.options.worker.recurrSleepMs;
     this.inFlight = []; // Our work heap of Promises  
     this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
-    this.running = false;
+    this.running = false; // Worker is running.
+    this.isProcessing = false; // Only let one process() method execute on the work heap at a time
   }
 
   /**
    * Begin the scheduled loop.
+   * @param {number} stagger vary startup time by some fraction of recurrence
    */
   start(stagger = 0.618) {
     const _scope = _fileScope('start');
@@ -73,17 +78,17 @@ class Worker {
    * results, and use the race as a sort of condvar for checking everything
    * in the list of what we were waiting for.
    * NB this means promise cannot be further chained, or it loses the magic.
-   * @param {Promise} promise
+   * @param {Promise} promise promise to watch
    * @returns {Promise} watchedPromise
    */
   static watchedPromise(promise) {
-    if (Object.prototype.hasOwnProperty.call(promise, 'isSettled')) {
+    if (Object.hasOwn(promise, 'isSettled')) {
       return promise;
     }
 
     let isSettled = false;
-    let resolved = undefined;
-    let rejected = undefined;
+    let resolved;
+    let rejected;
 
     promise = promise.then(
       (res) => {
@@ -106,12 +111,17 @@ class Worker {
     return promise;
   }
 
+  /**
+   * @callback HandlerFunction
+   * @param {*} resolved
+   * @param {*} rejected
+   * @returns {void}
+   */
   /**
    * Process the list of promises, removing any which have settled,
    * and passes their fulfilled values to the handler.
-   *
-   * @param {HandlerFunction} handler 
-   * @returns {number} handled
+   * @param {HandlerFunction} handler invoked on settled promises
+   * @returns {number} handled promises removed from inFlight list
    */
   _handleWatchedList(handler) {
     let handled = 0;
@@ -129,14 +139,15 @@ class Worker {
 
   /**
    * Refill the workpool with our special promises.
-   * @returns {Promise[]}
+   * @param {*} dbCtx db context
+   * @returns {Promise<Promise[]>} wrapped promises
    */
-  async _getWork() {
+  async _getWork(dbCtx) {
     const _scope = _fileScope('_getWork');
     let newPromises = [];
     const wanted = this.concurrency - this.inFlight.length;
     if (wanted > 0) {
-      newPromises = await this.promiseGiver(wanted);
+      newPromises = await this.promiseGiver(dbCtx, wanted);
       newPromises = newPromises.map((p) => Worker.watchedPromise(p));
       common.stackSafePush(this.inFlight, newPromises);
     }
@@ -146,15 +157,16 @@ class Worker {
 
   /**
    * Simply log results of promises, for now.
-   * @param {*} resolved 
-   * @param {*} rejected 
+   * @param {*} resolved promise resolution value
+   * @param {*} rejected promise rejection value
    */
   _watchedHandler(resolved, rejected) {
     const _scope = _fileScope('_watchedHandler');
 
-    this.logger.debug(_scope, { resolved, rejected });
     if (rejected) {
-      this.logger.error(_scope, { rejected });
+      this.logger.error(_scope, 'rejected', { rejected });
+    } else {
+      this.logger.debug(_scope, 'resolved', { resolved });
     }
   }
 
@@ -173,38 +185,53 @@ class Worker {
   async process() {
     const _scope = _fileScope('process');
 
-    this.logger.debug(_scope, 'called', {});
+    this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing });
+
+
+    if (this.isProcessing) {
+      return;
+    }
+    this.isProcessing = true;
 
     // Interrupt any pending sleep, if we were called out of timeout-cycle.
     clearTimeout(this.nextTimeout);
+    this.nextTimeout = undefined;
 
-    // Try to fill the hopper
-    await this._getWork();
-
-    while (this.inFlight.length > 0) {
-      /* Wait for one or more to be resolved.
-       * We don't care what the result was, as we have to scan the list
-       * for all settled promises anyhow, and our wrapper has stored the
-       * results.
-       */
-      try {
-        await Promise.race(this.inFlight);
-      } catch (e) {
-        // NOP here, as we'll handle it when we scan the list
-      }
-      this.logger.debug(_scope, { msg: 'race completed' });
-
-      // Address settled promises..
-      const settled = this._handleWatchedList(this._watchedHandler.bind(this));
-      this.logger.debug(_scope, { settled });
-      
-      // Try to fill the vacancy
-      // TODO: maybe rate-limit this call based on slot availability
-      await this._getWork();
+    try {
+      await this.db.context(async (dbCtx) => {
+
+        // Try to fill the hopper
+        await this._getWork(dbCtx);
+
+        while (this.inFlight.length > 0) {
+          /* Wait for one or more to be resolved.
+          * We don't care what the result was, as we have to scan the list
+          * for all settled promises anyhow, and our wrapper has stored the
+          * results.
+          */
+          try {
+            await Promise.race(this.inFlight);
+          } catch (e) { // eslint-disable-line no-unused-vars
+            // NOP here, as we'll handle it when we scan the list
+          }
+
+          // Address settled promises..
+          this._handleWatchedList(this._watchedHandler.bind(this));
+
+          // Try to fill the vacancy
+          // TODO: maybe rate-limit this call based on slot availability
+          await this._getWork(dbCtx);
+        }
+      }); // dbCtx
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      // Try again later anyhow.
     }
 
     // No more work, wait a while and retry
     this._recurr();
+
+    this.isProcessing = false;
   }
 
 }