return response;
});
- this.worker = new Worker(logger, this.workFeed.bind(this), options);
+ this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
this.worker.start();
}
/**
*
+ * @param {*} dbCtx
* @param {Number} wanted maximum tasks to claim
* @returns {Promise<void>[]}
*/
- async workFeed(wanted) {
+ async workFeed(dbCtx, wanted) {
const _scope = _fileScope('workFeed');
const inProgress = [];
const requestId = common.requestId();
this.logger.debug(_scope, 'called', { wanted });
try {
- await this.db.context(async (dbCtx) => {
- if (wanted > 0) {
- // Update topics before anything else.
- const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- // Each task gets a new context, as these map to connections in some dbs.
- // This dbCtx goes away after launching the processing functions, so would not be available to tasks.
- topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
- inProgress.push(...topicFetchPromises);
- wanted -= topicFetchPromises.length;
- }
+ if (wanted > 0) {
+ // Update topics before anything else.
+ const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+ topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+ inProgress.push(...topicFetchPromises);
+ wanted -= topicFetchPromises.length;
+ }
- if (wanted > 0) {
- // Then any pending verifications.
- const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
- inProgress.push(...verificationPromises);
- wanted -= verificationPromises.length;
- }
+ if (wanted > 0) {
+ // Then any pending verifications.
+ const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+ verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+ inProgress.push(...verificationPromises);
+ wanted -= verificationPromises.length;
+ }
- if (wanted > 0) {
- // Finally dole out content.
- const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
- inProgress.push(...updatePromises);
- wanted -= updatePromises.length;
- }
- }); // dbCtx
+ if (wanted > 0) {
+ // Finally dole out content.
+ const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+ updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+ inProgress.push(...updatePromises);
+ wanted -= updatePromises.length;
+ }
} catch (e) {
this.logger.error(_scope, 'failed', { error: e });
// do not re-throw, return what we've claimed so far