X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=src%2Fcommunication.js;h=b67e2c731ff122e09f4e68557a8ddf0b1dec927c;hb=9f9d3c81cc0960f03e6598258d36ad828058f65f;hp=72f2642a003e465aab42909ce13f90b8fd93694c;hpb=5e7ab1d9ec27aab6e321c5a647cc1717980d331e;p=websub-hub diff --git a/src/communication.js b/src/communication.js index 72f2642..b67e2c7 100644 --- a/src/communication.js +++ b/src/communication.js @@ -49,7 +49,7 @@ class Communication { return response; }); - this.worker = new Worker(logger, this.workFeed.bind(this), options); + this.worker = new Worker(logger, db, this.workFeed.bind(this), options); this.worker.start(); } @@ -647,10 +647,11 @@ class Communication { /** * + * @param {*} dbCtx * @param {Number} wanted maximum tasks to claim * @returns {Promise[]} */ - async workFeed(wanted) { + async workFeed(dbCtx, wanted) { const _scope = _fileScope('workFeed'); const inProgress = []; const requestId = common.requestId(); @@ -661,33 +662,29 @@ class Communication { this.logger.debug(_scope, 'called', { wanted }); try { - await this.db.context(async (dbCtx) => { - if (wanted > 0) { - // Update topics before anything else. - const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - // Each task gets a new context, as these map to connections in some dbs. - // This dbCtx goes away after launching the processing functions, so would not be available to tasks. - topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); - inProgress.push(...topicFetchPromises); - wanted -= topicFetchPromises.length; - } + if (wanted > 0) { + // Update topics before anything else. + const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId)); + inProgress.push(...topicFetchPromises); + wanted -= topicFetchPromises.length; + } - if (wanted > 0) { - // Then any pending verifications. - const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); - inProgress.push(...verificationPromises); - wanted -= verificationPromises.length; - } + if (wanted > 0) { + // Then any pending verifications. + const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId)); + inProgress.push(...verificationPromises); + wanted -= verificationPromises.length; + } - if (wanted > 0) { - // Finally dole out content. - const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); - inProgress.push(...updatePromises); - wanted -= updatePromises.length; - } - }); // dbCtx + if (wanted > 0) { + // Finally dole out content. + const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId)); + inProgress.push(...updatePromises); + wanted -= updatePromises.length; + } } catch (e) { this.logger.error(_scope, 'failed', { error: e }); // do not re-throw, return what we've claimed so far