worker tasks return to having their own db conncetions
authorJustin Wind <justin.wind+git@gmail.com>
Mon, 23 Aug 2021 20:19:05 +0000 (13:19 -0700)
committerJustin Wind <justin.wind+git@gmail.com>
Mon, 23 Aug 2021 20:19:05 +0000 (13:19 -0700)
This reverts the intent of 9f9d3c8.  Sharing a connection apparently
means transactions will become interleved.

src/communication.js
src/worker.js

index b67e2c731ff122e09f4e68557a8ddf0b1dec927c..345714f328951f4f7964249a8884882390bb4abc 100644 (file)
@@ -340,7 +340,7 @@ class Communication {
           throw new Errors.InternalInconsistencyError(verification.mode);
       }
 
-      await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId);
+      await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
     }); // txCtx
 
     this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
@@ -665,7 +665,7 @@ class Communication {
       if (wanted > 0) {
         // Update topics before anything else.
         const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-        topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+        topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
         inProgress.push(...topicFetchPromises);
         wanted -= topicFetchPromises.length;
       }
@@ -673,7 +673,7 @@ class Communication {
       if (wanted > 0) {
         // Then any pending verifications.
         const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-        verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+        verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
         inProgress.push(...verificationPromises);
         wanted -= verificationPromises.length;
       }
@@ -681,7 +681,7 @@ class Communication {
       if (wanted > 0) {
         // Finally dole out content.
         const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-        updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+        updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
         inProgress.push(...updatePromises);
         wanted -= updatePromises.length;
       }
index 4f566ad9f97209bcc538692cf87b6e412ccbe877..ebf891405124e730881b96c1b69d4bf4593be689 100644 (file)
@@ -189,7 +189,6 @@ class Worker {
     // Interrupt any pending sleep, if we were called out of timeout-cycle.
     clearTimeout(this.nextTimeout);
 
-    // Share one db connection for all tasks.
     try {
       await this.db.context(async (dbCtx) => {