From: Justin Wind Date: Mon, 23 Aug 2021 20:19:05 +0000 (-0700) Subject: worker tasks return to having their own db conncetions X-Git-Tag: v1.1.5^2~3 X-Git-Url: https://git.squeep.com/?a=commitdiff_plain;h=8ba8751c41f56f0d97f3b694f89ebef568497530;p=websub-hub worker tasks return to having their own db conncetions This reverts the intent of 9f9d3c8. Sharing a connection apparently means transactions will become interleved. --- diff --git a/src/communication.js b/src/communication.js index b67e2c7..345714f 100644 --- a/src/communication.js +++ b/src/communication.js @@ -340,7 +340,7 @@ class Communication { throw new Errors.InternalInconsistencyError(verification.mode); } - await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId); + await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId); }); // txCtx this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted }); @@ -665,7 +665,7 @@ class Communication { if (wanted > 0) { // Update topics before anything else. const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId)); + topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); inProgress.push(...topicFetchPromises); wanted -= topicFetchPromises.length; } @@ -673,7 +673,7 @@ class Communication { if (wanted > 0) { // Then any pending verifications. const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId)); + verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); inProgress.push(...verificationPromises); wanted -= verificationPromises.length; } @@ -681,7 +681,7 @@ class Communication { if (wanted > 0) { // Finally dole out content. const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId)); + updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); inProgress.push(...updatePromises); wanted -= updatePromises.length; } diff --git a/src/worker.js b/src/worker.js index 4f566ad..ebf8914 100644 --- a/src/worker.js +++ b/src/worker.js @@ -189,7 +189,6 @@ class Worker { // Interrupt any pending sleep, if we were called out of timeout-cycle. clearTimeout(this.nextTimeout); - // Share one db connection for all tasks. try { await this.db.context(async (dbCtx) => {