X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;ds=sidebyside;f=src%2Fcommunication.js;h=345714f328951f4f7964249a8884882390bb4abc;hb=8ba8751c41f56f0d97f3b694f89ebef568497530;hp=b67e2c731ff122e09f4e68557a8ddf0b1dec927c;hpb=ed6dc5a66ce0eaf2dd61f9fb7a5ec048944c68ee;p=websub-hub diff --git a/src/communication.js b/src/communication.js index b67e2c7..345714f 100644 --- a/src/communication.js +++ b/src/communication.js @@ -340,7 +340,7 @@ class Communication { throw new Errors.InternalInconsistencyError(verification.mode); } - await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId); + await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId); }); // txCtx this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted }); @@ -665,7 +665,7 @@ class Communication { if (wanted > 0) { // Update topics before anything else. const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId)); + topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); inProgress.push(...topicFetchPromises); wanted -= topicFetchPromises.length; } @@ -673,7 +673,7 @@ class Communication { if (wanted > 0) { // Then any pending verifications. const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId)); + verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); inProgress.push(...verificationPromises); wanted -= verificationPromises.length; } @@ -681,7 +681,7 @@ class Communication { if (wanted > 0) { // Finally dole out content. const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId)); + updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); inProgress.push(...updatePromises); wanted -= updatePromises.length; }