From 9f9d3c81cc0960f03e6598258d36ad828058f65f Mon Sep 17 00:00:00 2001 From: Justin Wind Date: Fri, 13 Aug 2021 11:57:20 -0700 Subject: [PATCH] worker tasks now share one db context, rather than one per task When using Postgres, each context is a connection. Moved the context creation into the worker so that all tasks can use one connection when its processing, which is much healthier for the database. --- CHANGELOG.md | 4 +++ README.md | 1 + src/communication.js | 51 ++++++++++++++--------------- src/worker.js | 69 +++++++++++++++++++++++---------------- test/src/communication.js | 10 +++--- test/src/worker.js | 28 ++++++++++++---- 6 files changed, 97 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 71d4dfe..7e97c9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ Releases and notable changes to this project are documented here. ## [Unreleased] +### Fixed + +- Worker tasks, such as delivering content updates, now share one database context. This reduces the connection load for Postgres backends, affording greater scalability. + ## [v1.1.2] - 2021-08-11 ### Added diff --git a/README.md b/README.md index 32ca1bb..c7e805a 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,7 @@ This implementation is built atop an in-house API framework, for Reasons. It wo - schema-version-helper.js - schema migrations aide - postgres/ - index.js - PostgreSQL implementation + - listener.js - notify/listen connection to support topic content caching - sql/ - statements and schemas - sqlite/ - index.js - SQLite implementation diff --git a/src/communication.js b/src/communication.js index 72f2642..b67e2c7 100644 --- a/src/communication.js +++ b/src/communication.js @@ -49,7 +49,7 @@ class Communication { return response; }); - this.worker = new Worker(logger, this.workFeed.bind(this), options); + this.worker = new Worker(logger, db, this.workFeed.bind(this), options); this.worker.start(); } @@ -647,10 +647,11 @@ class Communication { /** * + * @param {*} dbCtx * @param {Number} wanted maximum tasks to claim * @returns {Promise[]} */ - async workFeed(wanted) { + async workFeed(dbCtx, wanted) { const _scope = _fileScope('workFeed'); const inProgress = []; const requestId = common.requestId(); @@ -661,33 +662,29 @@ class Communication { this.logger.debug(_scope, 'called', { wanted }); try { - await this.db.context(async (dbCtx) => { - if (wanted > 0) { - // Update topics before anything else. - const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - // Each task gets a new context, as these map to connections in some dbs. - // This dbCtx goes away after launching the processing functions, so would not be available to tasks. - topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); - inProgress.push(...topicFetchPromises); - wanted -= topicFetchPromises.length; - } + if (wanted > 0) { + // Update topics before anything else. + const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId)); + inProgress.push(...topicFetchPromises); + wanted -= topicFetchPromises.length; + } - if (wanted > 0) { - // Then any pending verifications. - const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); - inProgress.push(...verificationPromises); - wanted -= verificationPromises.length; - } + if (wanted > 0) { + // Then any pending verifications. + const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId)); + inProgress.push(...verificationPromises); + wanted -= verificationPromises.length; + } - if (wanted > 0) { - // Finally dole out content. - const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); - inProgress.push(...updatePromises); - wanted -= updatePromises.length; - } - }); // dbCtx + if (wanted > 0) { + // Finally dole out content. + const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId)); + inProgress.push(...updatePromises); + wanted -= updatePromises.length; + } } catch (e) { this.logger.error(_scope, 'failed', { error: e }); // do not re-throw, return what we've claimed so far diff --git a/src/worker.js b/src/worker.js index ca77369..9b6fd7a 100644 --- a/src/worker.js +++ b/src/worker.js @@ -12,22 +12,25 @@ const _fileScope = common.fileScope(__filename); /** * @callback Worker~promiseGiver + * @param {*} dbCtx * @param {number} atMost * @returns {Promise[]} */ class Worker { /** - * @param {object} logger - * @param {Worker~promiseGiver} promiseGiver + * @param {object} logger + * @param {object} db + * @param {Worker~promiseGiver} promiseGiver * @param {object} options * @param {object} options.worker * @param {object} options.worker.pollingEnabled * @param {number} options.worker.recurrSleepMs * @param {number} options.worker.concurrency */ - constructor(logger, promiseGiver, options) { + constructor(logger, db, promiseGiver, options) { this.logger = logger; + this.db = db; this.options = options; if (!promiseGiver || typeof promiseGiver !== 'function') { throw new TypeError('function required'); @@ -129,14 +132,15 @@ class Worker { /** * Refill the workpool with our special promises. + * @param {*} dbCtx * @returns {Promise[]} */ - async _getWork() { + async _getWork(dbCtx) { const _scope = _fileScope('_getWork'); let newPromises = []; const wanted = this.concurrency - this.inFlight.length; if (wanted > 0) { - newPromises = await this.promiseGiver(wanted); + newPromises = await this.promiseGiver(dbCtx, wanted); newPromises = newPromises.map((p) => Worker.watchedPromise(p)); common.stackSafePush(this.inFlight, newPromises); } @@ -178,29 +182,38 @@ class Worker { // Interrupt any pending sleep, if we were called out of timeout-cycle. clearTimeout(this.nextTimeout); - // Try to fill the hopper - await this._getWork(); - - while (this.inFlight.length > 0) { - /* Wait for one or more to be resolved. - * We don't care what the result was, as we have to scan the list - * for all settled promises anyhow, and our wrapper has stored the - * results. - */ - try { - await Promise.race(this.inFlight); - } catch (e) { - // NOP here, as we'll handle it when we scan the list - } - this.logger.debug(_scope, { msg: 'race completed' }); - - // Address settled promises.. - const settled = this._handleWatchedList(this._watchedHandler.bind(this)); - this.logger.debug(_scope, { settled }); - - // Try to fill the vacancy - // TODO: maybe rate-limit this call based on slot availability - await this._getWork(); + // Share one db connection for all tasks. + try { + await this.db.context(async (dbCtx) => { + + // Try to fill the hopper + await this._getWork(dbCtx); + + while (this.inFlight.length > 0) { + /* Wait for one or more to be resolved. + * We don't care what the result was, as we have to scan the list + * for all settled promises anyhow, and our wrapper has stored the + * results. + */ + try { + await Promise.race(this.inFlight); + } catch (e) { + // NOP here, as we'll handle it when we scan the list + } + this.logger.debug(_scope, { msg: 'race completed' }); + + // Address settled promises.. + const settled = this._handleWatchedList(this._watchedHandler.bind(this)); + this.logger.debug(_scope, { settled }); + + // Try to fill the vacancy + // TODO: maybe rate-limit this call based on slot availability + await this._getWork(dbCtx); + } + }); // dbCtx + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + // Try again later anyhow. } // No more work, wait a while and retry diff --git a/test/src/communication.js b/test/src/communication.js index 7c62227..ca5f34e 100644 --- a/test/src/communication.js +++ b/test/src/communication.js @@ -837,8 +837,9 @@ describe('Communication', function () { }); // verificationClaimAndProcessById describe('workFeed', function () { - let wanted; + let stubCtx, wanted; beforeEach(function () { + stubCtx = {}; sinon.stub(communication, 'topicFetchProcess'); sinon.stub(communication, 'verificationProcess'); sinon.stub(communication, 'subscriptionDeliveryProcess'); @@ -853,12 +854,13 @@ describe('Communication', function () { const expectedLength = [topicIds, verificationIds, subscriptionIds].map((x) => x.length).reduce((a, b) => a + b, 0); wanted = 10; - const result = await communication.workFeed(wanted); + const result = await communication.workFeed(stubCtx, wanted); assert.strictEqual(result.length, expectedLength); }); it('covers no wanted work', async function () { - const result = await communication.workFeed(0); + wanted = 0; + const result = await communication.workFeed(stubCtx, wanted); assert.strictEqual(result.length, 0); assert(!communication.db.topicFetchClaim.called); assert(!communication.db.verificationClaim.called); @@ -871,7 +873,7 @@ describe('Communication', function () { const expectedLength = topicIds.length; wanted = 10; - const result = await communication.workFeed(wanted); + const result = await communication.workFeed(stubCtx, wanted); assert.strictEqual(result.length, expectedLength); }); diff --git a/test/src/worker.js b/test/src/worker.js index a9047a0..32bd065 100644 --- a/test/src/worker.js +++ b/test/src/worker.js @@ -8,6 +8,7 @@ const Worker = require('../../src/worker'); const Config = require('../../config'); const stubLogger = require('../stub-logger'); +const stubDb = require('../stub-db'); const noExpectedException = 'did not get expected exception'; @@ -19,7 +20,8 @@ describe('Worker', function () { beforeEach(function () { config = new Config('test'); promiseGiver = sinon.stub(); - worker = new Worker(stubLogger, promiseGiver, config); + worker = new Worker(stubLogger, stubDb, promiseGiver, config); + stubDb._reset(); }); afterEach(function () { @@ -33,7 +35,7 @@ describe('Worker', function () { it('requires a promiseGiver function', function () { try { - worker = new Worker(stubLogger, undefined, config); + worker = new Worker(stubLogger, stubDb, undefined, config); assert.fail('should require function argument'); } catch (e) { assert(e instanceof TypeError); @@ -44,13 +46,13 @@ describe('Worker', function () { describe('start', function () { it('starts without polling', function () { config.worker.pollingEnabled = false; - worker = new Worker(stubLogger, promiseGiver, config); + worker = new Worker(stubLogger, stubDb, promiseGiver, config); worker.start(); assert.strictEqual(worker.running, false); }); it('starts with polling', function () { config.worker.pollingEnabled = true; - worker = new Worker(stubLogger, promiseGiver, config); + worker = new Worker(stubLogger, stubDb, promiseGiver, config); sinon.stub(worker, '_recurr'); worker.start(); clearTimeout(worker.nextTimeout); @@ -60,7 +62,7 @@ describe('Worker', function () { describe('stop', function () { it('stops', function () { - worker = new Worker(stubLogger, promiseGiver, config); + worker = new Worker(stubLogger, stubDb, promiseGiver, config); worker.start(); worker.stop(); assert.strictEqual(worker.running, false); @@ -124,6 +126,10 @@ describe('Worker', function () { }); // _handleWatchedList describe('_getWork', function () { + let stubCtx; + beforeEach(function () { + stubCtx = {}; + }); it('gets tasks', async function () { const expected = [ Promise.resolve('first'), @@ -131,7 +137,7 @@ describe('Worker', function () { Promise.resolve('second'), ]; worker.promiseGiver.resolves(expected); - const result = await worker._getWork(); + const result = await worker._getWork(stubCtx); assert.deepStrictEqual(result, expected); assert.strictEqual(worker.inFlight.length, expected.length); }); @@ -142,7 +148,7 @@ describe('Worker', function () { Promise.reject('bad'), Promise.resolve('second'), ]; - const result = await worker._getWork(); + const result = await worker._getWork(stubCtx); assert(!worker.promiseGiver.called); assert.deepStrictEqual(result, []); }); @@ -190,6 +196,14 @@ describe('Worker', function () { assert.strictEqual(worker._getWork.callCount, 1); assert.strictEqual(worker._recurr.callCount, 1); }); + it('covers error', async function () { + const expected = new Error('blah'); + stubDb.context.restore(); + sinon.stub(stubDb, 'context').rejects(expected); + await worker.process(); + assert.strictEqual(worker._getWork.callCount, 0); + assert.strictEqual(worker._recurr.callCount, 1); + }); }); // process }); // Worker -- 2.43.2