From: Justin Wind Date: Fri, 13 Aug 2021 22:55:14 +0000 (-0700) Subject: Merge branch 'v1.1-dev' as v1.1.3 X-Git-Tag: v1.1.3 X-Git-Url: https://git.squeep.com/?a=commitdiff_plain;h=c4d2acfc78cc8b67649c2eaa60a8c6c34c3e6675;hp=409ff988982a5edfcd51c02c681187969db57d0a;p=websub-hub Merge branch 'v1.1-dev' as v1.1.3 --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 71d4dfe..0378b17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ Releases and notable changes to this project are documented here. ## [Unreleased] +## [v1.1.3] - 2021-08-13 + +### Fixed + +- Worker tasks, such as delivering content updates, now share one database context. This reduces the connection load for Postgres backends, affording greater scalability. + ## [v1.1.2] - 2021-08-11 ### Added @@ -35,7 +41,8 @@ Releases and notable changes to this project are documented here. --- -[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.1.2 +[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.1.3 +[v1.1.3]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.3;hp=v1.1.2 [v1.1.2]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.2;hp=v1.1.1 [v1.1.1]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.1;hp=v1.1.0 [v1.1.0]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.0;hp=v1.0.0 diff --git a/README.md b/README.md index 32ca1bb..c7e805a 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,7 @@ This implementation is built atop an in-house API framework, for Reasons. It wo - schema-version-helper.js - schema migrations aide - postgres/ - index.js - PostgreSQL implementation + - listener.js - notify/listen connection to support topic content caching - sql/ - statements and schemas - sqlite/ - index.js - SQLite implementation diff --git a/package-lock.json b/package-lock.json index fe33cd0..bf05746 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "websub-hub", - "version": "1.1.2", + "version": "1.1.3", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index ef0eaf9..2b0d195 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "websub-hub", - "version": "1.1.2", + "version": "1.1.3", "description": "A WebSub Hub server implementation.", "main": "server.js", "scripts": { diff --git a/src/communication.js b/src/communication.js index 72f2642..b67e2c7 100644 --- a/src/communication.js +++ b/src/communication.js @@ -49,7 +49,7 @@ class Communication { return response; }); - this.worker = new Worker(logger, this.workFeed.bind(this), options); + this.worker = new Worker(logger, db, this.workFeed.bind(this), options); this.worker.start(); } @@ -647,10 +647,11 @@ class Communication { /** * + * @param {*} dbCtx * @param {Number} wanted maximum tasks to claim * @returns {Promise[]} */ - async workFeed(wanted) { + async workFeed(dbCtx, wanted) { const _scope = _fileScope('workFeed'); const inProgress = []; const requestId = common.requestId(); @@ -661,33 +662,29 @@ class Communication { this.logger.debug(_scope, 'called', { wanted }); try { - await this.db.context(async (dbCtx) => { - if (wanted > 0) { - // Update topics before anything else. - const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - // Each task gets a new context, as these map to connections in some dbs. - // This dbCtx goes away after launching the processing functions, so would not be available to tasks. - topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); - inProgress.push(...topicFetchPromises); - wanted -= topicFetchPromises.length; - } + if (wanted > 0) { + // Update topics before anything else. + const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId)); + inProgress.push(...topicFetchPromises); + wanted -= topicFetchPromises.length; + } - if (wanted > 0) { - // Then any pending verifications. - const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); - inProgress.push(...verificationPromises); - wanted -= verificationPromises.length; - } + if (wanted > 0) { + // Then any pending verifications. + const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId)); + inProgress.push(...verificationPromises); + wanted -= verificationPromises.length; + } - if (wanted > 0) { - // Finally dole out content. - const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); - inProgress.push(...updatePromises); - wanted -= updatePromises.length; - } - }); // dbCtx + if (wanted > 0) { + // Finally dole out content. + const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId)); + inProgress.push(...updatePromises); + wanted -= updatePromises.length; + } } catch (e) { this.logger.error(_scope, 'failed', { error: e }); // do not re-throw, return what we've claimed so far diff --git a/src/manager.js b/src/manager.js index 6fb187c..d7a07f1 100644 --- a/src/manager.js +++ b/src/manager.js @@ -342,7 +342,7 @@ class Manager { let fn, info, id; try { if (data.mode === Enum.Mode.Publish) { - fn = 'topicPublish'; + fn = 'topicFetchRequested'; info = await this.db.topicFetchRequested(dbCtx, data.topicId); id = data.topicId; } else { @@ -366,7 +366,7 @@ class Manager { fn = 'topicFetchClaimAndProcessById'; await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId); } else { - fn = 'processVerification'; + fn = 'verificationClaimAndProcessById'; await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId); } } catch (e) { diff --git a/src/worker.js b/src/worker.js index ca77369..9b6fd7a 100644 --- a/src/worker.js +++ b/src/worker.js @@ -12,22 +12,25 @@ const _fileScope = common.fileScope(__filename); /** * @callback Worker~promiseGiver + * @param {*} dbCtx * @param {number} atMost * @returns {Promise[]} */ class Worker { /** - * @param {object} logger - * @param {Worker~promiseGiver} promiseGiver + * @param {object} logger + * @param {object} db + * @param {Worker~promiseGiver} promiseGiver * @param {object} options * @param {object} options.worker * @param {object} options.worker.pollingEnabled * @param {number} options.worker.recurrSleepMs * @param {number} options.worker.concurrency */ - constructor(logger, promiseGiver, options) { + constructor(logger, db, promiseGiver, options) { this.logger = logger; + this.db = db; this.options = options; if (!promiseGiver || typeof promiseGiver !== 'function') { throw new TypeError('function required'); @@ -129,14 +132,15 @@ class Worker { /** * Refill the workpool with our special promises. + * @param {*} dbCtx * @returns {Promise[]} */ - async _getWork() { + async _getWork(dbCtx) { const _scope = _fileScope('_getWork'); let newPromises = []; const wanted = this.concurrency - this.inFlight.length; if (wanted > 0) { - newPromises = await this.promiseGiver(wanted); + newPromises = await this.promiseGiver(dbCtx, wanted); newPromises = newPromises.map((p) => Worker.watchedPromise(p)); common.stackSafePush(this.inFlight, newPromises); } @@ -178,29 +182,38 @@ class Worker { // Interrupt any pending sleep, if we were called out of timeout-cycle. clearTimeout(this.nextTimeout); - // Try to fill the hopper - await this._getWork(); - - while (this.inFlight.length > 0) { - /* Wait for one or more to be resolved. - * We don't care what the result was, as we have to scan the list - * for all settled promises anyhow, and our wrapper has stored the - * results. - */ - try { - await Promise.race(this.inFlight); - } catch (e) { - // NOP here, as we'll handle it when we scan the list - } - this.logger.debug(_scope, { msg: 'race completed' }); - - // Address settled promises.. - const settled = this._handleWatchedList(this._watchedHandler.bind(this)); - this.logger.debug(_scope, { settled }); - - // Try to fill the vacancy - // TODO: maybe rate-limit this call based on slot availability - await this._getWork(); + // Share one db connection for all tasks. + try { + await this.db.context(async (dbCtx) => { + + // Try to fill the hopper + await this._getWork(dbCtx); + + while (this.inFlight.length > 0) { + /* Wait for one or more to be resolved. + * We don't care what the result was, as we have to scan the list + * for all settled promises anyhow, and our wrapper has stored the + * results. + */ + try { + await Promise.race(this.inFlight); + } catch (e) { + // NOP here, as we'll handle it when we scan the list + } + this.logger.debug(_scope, { msg: 'race completed' }); + + // Address settled promises.. + const settled = this._handleWatchedList(this._watchedHandler.bind(this)); + this.logger.debug(_scope, { settled }); + + // Try to fill the vacancy + // TODO: maybe rate-limit this call based on slot availability + await this._getWork(dbCtx); + } + }); // dbCtx + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + // Try again later anyhow. } // No more work, wait a while and retry diff --git a/test/src/communication.js b/test/src/communication.js index 7c62227..ca5f34e 100644 --- a/test/src/communication.js +++ b/test/src/communication.js @@ -837,8 +837,9 @@ describe('Communication', function () { }); // verificationClaimAndProcessById describe('workFeed', function () { - let wanted; + let stubCtx, wanted; beforeEach(function () { + stubCtx = {}; sinon.stub(communication, 'topicFetchProcess'); sinon.stub(communication, 'verificationProcess'); sinon.stub(communication, 'subscriptionDeliveryProcess'); @@ -853,12 +854,13 @@ describe('Communication', function () { const expectedLength = [topicIds, verificationIds, subscriptionIds].map((x) => x.length).reduce((a, b) => a + b, 0); wanted = 10; - const result = await communication.workFeed(wanted); + const result = await communication.workFeed(stubCtx, wanted); assert.strictEqual(result.length, expectedLength); }); it('covers no wanted work', async function () { - const result = await communication.workFeed(0); + wanted = 0; + const result = await communication.workFeed(stubCtx, wanted); assert.strictEqual(result.length, 0); assert(!communication.db.topicFetchClaim.called); assert(!communication.db.verificationClaim.called); @@ -871,7 +873,7 @@ describe('Communication', function () { const expectedLength = topicIds.length; wanted = 10; - const result = await communication.workFeed(wanted); + const result = await communication.workFeed(stubCtx, wanted); assert.strictEqual(result.length, expectedLength); }); diff --git a/test/src/worker.js b/test/src/worker.js index a9047a0..32bd065 100644 --- a/test/src/worker.js +++ b/test/src/worker.js @@ -8,6 +8,7 @@ const Worker = require('../../src/worker'); const Config = require('../../config'); const stubLogger = require('../stub-logger'); +const stubDb = require('../stub-db'); const noExpectedException = 'did not get expected exception'; @@ -19,7 +20,8 @@ describe('Worker', function () { beforeEach(function () { config = new Config('test'); promiseGiver = sinon.stub(); - worker = new Worker(stubLogger, promiseGiver, config); + worker = new Worker(stubLogger, stubDb, promiseGiver, config); + stubDb._reset(); }); afterEach(function () { @@ -33,7 +35,7 @@ describe('Worker', function () { it('requires a promiseGiver function', function () { try { - worker = new Worker(stubLogger, undefined, config); + worker = new Worker(stubLogger, stubDb, undefined, config); assert.fail('should require function argument'); } catch (e) { assert(e instanceof TypeError); @@ -44,13 +46,13 @@ describe('Worker', function () { describe('start', function () { it('starts without polling', function () { config.worker.pollingEnabled = false; - worker = new Worker(stubLogger, promiseGiver, config); + worker = new Worker(stubLogger, stubDb, promiseGiver, config); worker.start(); assert.strictEqual(worker.running, false); }); it('starts with polling', function () { config.worker.pollingEnabled = true; - worker = new Worker(stubLogger, promiseGiver, config); + worker = new Worker(stubLogger, stubDb, promiseGiver, config); sinon.stub(worker, '_recurr'); worker.start(); clearTimeout(worker.nextTimeout); @@ -60,7 +62,7 @@ describe('Worker', function () { describe('stop', function () { it('stops', function () { - worker = new Worker(stubLogger, promiseGiver, config); + worker = new Worker(stubLogger, stubDb, promiseGiver, config); worker.start(); worker.stop(); assert.strictEqual(worker.running, false); @@ -124,6 +126,10 @@ describe('Worker', function () { }); // _handleWatchedList describe('_getWork', function () { + let stubCtx; + beforeEach(function () { + stubCtx = {}; + }); it('gets tasks', async function () { const expected = [ Promise.resolve('first'), @@ -131,7 +137,7 @@ describe('Worker', function () { Promise.resolve('second'), ]; worker.promiseGiver.resolves(expected); - const result = await worker._getWork(); + const result = await worker._getWork(stubCtx); assert.deepStrictEqual(result, expected); assert.strictEqual(worker.inFlight.length, expected.length); }); @@ -142,7 +148,7 @@ describe('Worker', function () { Promise.reject('bad'), Promise.resolve('second'), ]; - const result = await worker._getWork(); + const result = await worker._getWork(stubCtx); assert(!worker.promiseGiver.called); assert.deepStrictEqual(result, []); }); @@ -190,6 +196,14 @@ describe('Worker', function () { assert.strictEqual(worker._getWork.callCount, 1); assert.strictEqual(worker._recurr.callCount, 1); }); + it('covers error', async function () { + const expected = new Error('blah'); + stubDb.context.restore(); + sinon.stub(stubDb, 'context').rejects(expected); + await worker.process(); + assert.strictEqual(worker._getWork.callCount, 0); + assert.strictEqual(worker._recurr.callCount, 1); + }); }); // process }); // Worker