worker tasks now share one db context, rather than one per task
[websub-hub] / src / worker.js
index ca77369aa3646fa871547bc67f8bd5c49e65b6d4..9b6fd7ab882752f00dc76ae693db8980e8eadc76 100644 (file)
@@ -12,22 +12,25 @@ const _fileScope = common.fileScope(__filename);
 
 /**
  * @callback Worker~promiseGiver
+ * @param {*} dbCtx
  * @param {number} atMost
  * @returns {Promise<void>[]}
  */
 
 class Worker {
   /**
-   * @param {object} logger 
-   * @param {Worker~promiseGiver} promiseGiver 
+   * @param {object} logger
+   * @param {object} db
+   * @param {Worker~promiseGiver} promiseGiver
    * @param {object} options
    * @param {object} options.worker
    * @param {object} options.worker.pollingEnabled
    * @param {number} options.worker.recurrSleepMs
    * @param {number} options.worker.concurrency
    */
-  constructor(logger, promiseGiver, options) {
+  constructor(logger, db, promiseGiver, options) {
     this.logger = logger;
+    this.db = db;
     this.options = options;
     if (!promiseGiver || typeof promiseGiver !== 'function') {
       throw new TypeError('function required');
@@ -129,14 +132,15 @@ class Worker {
 
   /**
    * Refill the workpool with our special promises.
+   * @param {*} dbCtx
    * @returns {Promise[]}
    */
-  async _getWork() {
+  async _getWork(dbCtx) {
     const _scope = _fileScope('_getWork');
     let newPromises = [];
     const wanted = this.concurrency - this.inFlight.length;
     if (wanted > 0) {
-      newPromises = await this.promiseGiver(wanted);
+      newPromises = await this.promiseGiver(dbCtx, wanted);
       newPromises = newPromises.map((p) => Worker.watchedPromise(p));
       common.stackSafePush(this.inFlight, newPromises);
     }
@@ -178,29 +182,38 @@ class Worker {
     // Interrupt any pending sleep, if we were called out of timeout-cycle.
     clearTimeout(this.nextTimeout);
 
-    // Try to fill the hopper
-    await this._getWork();
-
-    while (this.inFlight.length > 0) {
-      /* Wait for one or more to be resolved.
-       * We don't care what the result was, as we have to scan the list
-       * for all settled promises anyhow, and our wrapper has stored the
-       * results.
-       */
-      try {
-        await Promise.race(this.inFlight);
-      } catch (e) {
-        // NOP here, as we'll handle it when we scan the list
-      }
-      this.logger.debug(_scope, { msg: 'race completed' });
-
-      // Address settled promises..
-      const settled = this._handleWatchedList(this._watchedHandler.bind(this));
-      this.logger.debug(_scope, { settled });
-      
-      // Try to fill the vacancy
-      // TODO: maybe rate-limit this call based on slot availability
-      await this._getWork();
+    // Share one db connection for all tasks.
+    try {
+      await this.db.context(async (dbCtx) => {
+
+        // Try to fill the hopper
+        await this._getWork(dbCtx);
+
+        while (this.inFlight.length > 0) {
+          /* Wait for one or more to be resolved.
+          * We don't care what the result was, as we have to scan the list
+          * for all settled promises anyhow, and our wrapper has stored the
+          * results.
+          */
+          try {
+            await Promise.race(this.inFlight);
+          } catch (e) {
+            // NOP here, as we'll handle it when we scan the list
+          }
+          this.logger.debug(_scope, { msg: 'race completed' });
+
+          // Address settled promises..
+          const settled = this._handleWatchedList(this._watchedHandler.bind(this));
+          this.logger.debug(_scope, { settled });
+
+          // Try to fill the vacancy
+          // TODO: maybe rate-limit this call based on slot availability
+          await this._getWork(dbCtx);
+        }
+      }); // dbCtx
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      // Try again later anyhow.
     }
 
     // No more work, wait a while and retry