X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=src%2Fcommunication.js;h=345714f328951f4f7964249a8884882390bb4abc;hb=009a27af38e750695d11e6699c1d20928ed5f73d;hp=b67e2c731ff122e09f4e68557a8ddf0b1dec927c;hpb=9f9d3c81cc0960f03e6598258d36ad828058f65f;p=websub-hub diff --git a/src/communication.js b/src/communication.js index b67e2c7..345714f 100644 --- a/src/communication.js +++ b/src/communication.js @@ -340,7 +340,7 @@ class Communication { throw new Errors.InternalInconsistencyError(verification.mode); } - await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId); + await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId); }); // txCtx this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted }); @@ -665,7 +665,7 @@ class Communication { if (wanted > 0) { // Update topics before anything else. const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId)); + topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); inProgress.push(...topicFetchPromises); wanted -= topicFetchPromises.length; } @@ -673,7 +673,7 @@ class Communication { if (wanted > 0) { // Then any pending verifications. const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId)); + verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); inProgress.push(...verificationPromises); wanted -= verificationPromises.length; } @@ -681,7 +681,7 @@ class Communication { if (wanted > 0) { // Finally dole out content. const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId)); + updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); inProgress.push(...updatePromises); wanted -= updatePromises.length; }