worker tasks return to having their own db conncetions
[websub-hub] / src / communication.js
index b67e2c731ff122e09f4e68557a8ddf0b1dec927c..345714f328951f4f7964249a8884882390bb4abc 100644 (file)
@@ -340,7 +340,7 @@ class Communication {
           throw new Errors.InternalInconsistencyError(verification.mode);
       }
 
-      await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId);
+      await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
     }); // txCtx
 
     this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
@@ -665,7 +665,7 @@ class Communication {
       if (wanted > 0) {
         // Update topics before anything else.
         const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-        topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+        topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
         inProgress.push(...topicFetchPromises);
         wanted -= topicFetchPromises.length;
       }
@@ -673,7 +673,7 @@ class Communication {
       if (wanted > 0) {
         // Then any pending verifications.
         const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-        verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+        verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
         inProgress.push(...verificationPromises);
         wanted -= verificationPromises.length;
       }
@@ -681,7 +681,7 @@ class Communication {
       if (wanted > 0) {
         // Finally dole out content.
         const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-        updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+        updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
         inProgress.push(...updatePromises);
         wanted -= updatePromises.length;
       }