worker tasks now share one db context, rather than one per task
[websub-hub] / src / communication.js
index 72f2642a003e465aab42909ce13f90b8fd93694c..b67e2c731ff122e09f4e68557a8ddf0b1dec927c 100644 (file)
@@ -49,7 +49,7 @@ class Communication {
       return response;
     });
 
-    this.worker = new Worker(logger, this.workFeed.bind(this), options);
+    this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
     this.worker.start();
   }
 
@@ -647,10 +647,11 @@ class Communication {
 
   /**
    * 
+   * @param {*} dbCtx
    * @param {Number} wanted maximum tasks to claim
    * @returns {Promise<void>[]}
    */
-  async workFeed(wanted) {
+  async workFeed(dbCtx, wanted) {
     const _scope = _fileScope('workFeed');
     const inProgress = [];
     const requestId = common.requestId();
@@ -661,33 +662,29 @@ class Communication {
     this.logger.debug(_scope, 'called', { wanted });
 
     try {
-      await this.db.context(async (dbCtx) => {
-        if (wanted > 0) {
-          // Update topics before anything else.
-          const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-          // Each task gets a new context, as these map to connections in some dbs.
-          // This dbCtx goes away after launching the processing functions, so would not be available to tasks.
-          topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
-          inProgress.push(...topicFetchPromises);
-          wanted -= topicFetchPromises.length;
-        }
+      if (wanted > 0) {
+        // Update topics before anything else.
+        const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+        topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+        inProgress.push(...topicFetchPromises);
+        wanted -= topicFetchPromises.length;
+      }
 
-        if (wanted > 0) {
-          // Then any pending verifications.
-          const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-          verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
-          inProgress.push(...verificationPromises);
-          wanted -= verificationPromises.length;
-        }
+      if (wanted > 0) {
+        // Then any pending verifications.
+        const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+        verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+        inProgress.push(...verificationPromises);
+        wanted -= verificationPromises.length;
+      }
 
-        if (wanted > 0) {
-          // Finally dole out content.
-          const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-          updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
-          inProgress.push(...updatePromises);
-          wanted -= updatePromises.length;
-        }
-      }); // dbCtx
+      if (wanted > 0) {
+        // Finally dole out content.
+        const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+        updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+        inProgress.push(...updatePromises);
+        wanted -= updatePromises.length;
+      }
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e });
       // do not re-throw, return what we've claimed so far