update dependencies and devDependencies, fix lint issues
[websub-hub] / src / worker.js
index ebf891405124e730881b96c1b69d4bf4593be689..4464af4ecba50ae95a67509c6ed127c230d699b2 100644 (file)
@@ -11,7 +11,7 @@ const _fileScope = common.fileScope(__filename);
  */
 
 /**
- * @callback Worker~promiseGiver
+ * @callback PromiseGiver
  * @param {*} dbCtx
  * @param {number} atMost
  * @returns {Promise<void>[]}
@@ -19,14 +19,14 @@ const _fileScope = common.fileScope(__filename);
 
 class Worker {
   /**
-   * @param {object} logger
-   * @param {object} db
-   * @param {Worker~promiseGiver} promiseGiver
-   * @param {object} options
-   * @param {object} options.worker
-   * @param {object} options.worker.pollingEnabled
-   * @param {number} options.worker.recurrSleepMs
-   * @param {number} options.worker.concurrency
+   * @param {object} logger logger instance
+   * @param {object} db db instance
+   * @param {PromiseGiver} promiseGiver function which fetches and processes work
+   * @param {object} options options
+   * @param {object} options.worker worker options
+   * @param {object} options.worker.pollingEnabled whether to run worker at all
+   * @param {number} options.worker.recurrSleepMs time between processing runs
+   * @param {number} options.worker.concurrency how much work to be working on at once
    */
   constructor(logger, db, promiseGiver, options) {
     this.logger = logger;
@@ -41,12 +41,13 @@ class Worker {
     this.recurrSleepMs = this.options.worker.recurrSleepMs;
     this.inFlight = []; // Our work heap of Promises  
     this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
-    this.running = false;
+    this.running = false; // Worker is running.
     this.isProcessing = false; // Only let one process() method execute on the work heap at a time
   }
 
   /**
    * Begin the scheduled loop.
+   * @param {number} stagger vary startup time by some fraction of recurrence
    */
   start(stagger = 0.618) {
     const _scope = _fileScope('start');
@@ -77,17 +78,17 @@ class Worker {
    * results, and use the race as a sort of condvar for checking everything
    * in the list of what we were waiting for.
    * NB this means promise cannot be further chained, or it loses the magic.
-   * @param {Promise} promise
+   * @param {Promise} promise promise to watch
    * @returns {Promise} watchedPromise
    */
   static watchedPromise(promise) {
-    if (Object.prototype.hasOwnProperty.call(promise, 'isSettled')) {
+    if (Object.hasOwn(promise, 'isSettled')) {
       return promise;
     }
 
     let isSettled = false;
-    let resolved = undefined;
-    let rejected = undefined;
+    let resolved;
+    let rejected;
 
     promise = promise.then(
       (res) => {
@@ -110,12 +111,17 @@ class Worker {
     return promise;
   }
 
+  /**
+   * @callback HandlerFunction
+   * @param {*} resolved
+   * @param {*} rejected
+   * @returns {void}
+   */
   /**
    * Process the list of promises, removing any which have settled,
    * and passes their fulfilled values to the handler.
-   *
-   * @param {HandlerFunction} handler 
-   * @returns {number} handled
+   * @param {HandlerFunction} handler invoked on settled promises
+   * @returns {number} handled promises removed from inFlight list
    */
   _handleWatchedList(handler) {
     let handled = 0;
@@ -133,8 +139,8 @@ class Worker {
 
   /**
    * Refill the workpool with our special promises.
-   * @param {*} dbCtx
-   * @returns {Promise[]}
+   * @param {*} dbCtx db context
+   * @returns {Promise<Promise[]>} wrapped promises
    */
   async _getWork(dbCtx) {
     const _scope = _fileScope('_getWork');
@@ -151,15 +157,16 @@ class Worker {
 
   /**
    * Simply log results of promises, for now.
-   * @param {*} resolved 
-   * @param {*} rejected 
+   * @param {*} resolved promise resolution value
+   * @param {*} rejected promise rejection value
    */
   _watchedHandler(resolved, rejected) {
     const _scope = _fileScope('_watchedHandler');
 
-    this.logger.debug(_scope, { resolved, rejected });
     if (rejected) {
-      this.logger.error(_scope, { rejected });
+      this.logger.error(_scope, 'rejected', { rejected });
+    } else {
+      this.logger.debug(_scope, 'resolved', { resolved });
     }
   }
 
@@ -188,6 +195,7 @@ class Worker {
 
     // Interrupt any pending sleep, if we were called out of timeout-cycle.
     clearTimeout(this.nextTimeout);
+    this.nextTimeout = undefined;
 
     try {
       await this.db.context(async (dbCtx) => {
@@ -203,14 +211,12 @@ class Worker {
           */
           try {
             await Promise.race(this.inFlight);
-          } catch (e) {
+          } catch (e) { // eslint-disable-line no-unused-vars
             // NOP here, as we'll handle it when we scan the list
           }
-          this.logger.debug(_scope, { msg: 'race completed' });
 
           // Address settled promises..
-          const settled = this._handleWatchedList(this._watchedHandler.bind(this));
-          this.logger.debug(_scope, { settled });
+          this._handleWatchedList(this._watchedHandler.bind(this));
 
           // Try to fill the vacancy
           // TODO: maybe rate-limit this call based on slot availability