throw new Errors.InternalInconsistencyError(verification.mode);
}
- await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId);
+ await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
}); // txCtx
this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
if (wanted > 0) {
// Update topics before anything else.
const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+ topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
inProgress.push(...topicFetchPromises);
wanted -= topicFetchPromises.length;
}
if (wanted > 0) {
// Then any pending verifications.
const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+ verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
inProgress.push(...verificationPromises);
wanted -= verificationPromises.length;
}
if (wanted > 0) {
// Finally dole out content.
const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+ updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
inProgress.push(...updatePromises);
wanted -= updatePromises.length;
}