## [Unreleased]
+### Fixed
+
+- Worker tasks, such as delivering content updates, now share one database context. This reduces the connection load for Postgres backends, affording greater scalability.
+
## [v1.1.2] - 2021-08-11
### Added
- schema-version-helper.js - schema migrations aide
- postgres/
- index.js - PostgreSQL implementation
+ - listener.js - notify/listen connection to support topic content caching
- sql/ - statements and schemas
- sqlite/
- index.js - SQLite implementation
return response;
});
- this.worker = new Worker(logger, this.workFeed.bind(this), options);
+ this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
this.worker.start();
}
/**
*
+ * @param {*} dbCtx
* @param {Number} wanted maximum tasks to claim
* @returns {Promise<void>[]}
*/
- async workFeed(wanted) {
+ async workFeed(dbCtx, wanted) {
const _scope = _fileScope('workFeed');
const inProgress = [];
const requestId = common.requestId();
this.logger.debug(_scope, 'called', { wanted });
try {
- await this.db.context(async (dbCtx) => {
- if (wanted > 0) {
- // Update topics before anything else.
- const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- // Each task gets a new context, as these map to connections in some dbs.
- // This dbCtx goes away after launching the processing functions, so would not be available to tasks.
- topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
- inProgress.push(...topicFetchPromises);
- wanted -= topicFetchPromises.length;
- }
+ if (wanted > 0) {
+ // Update topics before anything else.
+ const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+ topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+ inProgress.push(...topicFetchPromises);
+ wanted -= topicFetchPromises.length;
+ }
- if (wanted > 0) {
- // Then any pending verifications.
- const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
- inProgress.push(...verificationPromises);
- wanted -= verificationPromises.length;
- }
+ if (wanted > 0) {
+ // Then any pending verifications.
+ const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+ verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+ inProgress.push(...verificationPromises);
+ wanted -= verificationPromises.length;
+ }
- if (wanted > 0) {
- // Finally dole out content.
- const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
- inProgress.push(...updatePromises);
- wanted -= updatePromises.length;
- }
- }); // dbCtx
+ if (wanted > 0) {
+ // Finally dole out content.
+ const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+ updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+ inProgress.push(...updatePromises);
+ wanted -= updatePromises.length;
+ }
} catch (e) {
this.logger.error(_scope, 'failed', { error: e });
// do not re-throw, return what we've claimed so far
/**
* @callback Worker~promiseGiver
+ * @param {*} dbCtx
* @param {number} atMost
* @returns {Promise<void>[]}
*/
class Worker {
/**
- * @param {object} logger
- * @param {Worker~promiseGiver} promiseGiver
+ * @param {object} logger
+ * @param {object} db
+ * @param {Worker~promiseGiver} promiseGiver
* @param {object} options
* @param {object} options.worker
* @param {object} options.worker.pollingEnabled
* @param {number} options.worker.recurrSleepMs
* @param {number} options.worker.concurrency
*/
- constructor(logger, promiseGiver, options) {
+ constructor(logger, db, promiseGiver, options) {
this.logger = logger;
+ this.db = db;
this.options = options;
if (!promiseGiver || typeof promiseGiver !== 'function') {
throw new TypeError('function required');
/**
* Refill the workpool with our special promises.
+ * @param {*} dbCtx
* @returns {Promise[]}
*/
- async _getWork() {
+ async _getWork(dbCtx) {
const _scope = _fileScope('_getWork');
let newPromises = [];
const wanted = this.concurrency - this.inFlight.length;
if (wanted > 0) {
- newPromises = await this.promiseGiver(wanted);
+ newPromises = await this.promiseGiver(dbCtx, wanted);
newPromises = newPromises.map((p) => Worker.watchedPromise(p));
common.stackSafePush(this.inFlight, newPromises);
}
// Interrupt any pending sleep, if we were called out of timeout-cycle.
clearTimeout(this.nextTimeout);
- // Try to fill the hopper
- await this._getWork();
-
- while (this.inFlight.length > 0) {
- /* Wait for one or more to be resolved.
- * We don't care what the result was, as we have to scan the list
- * for all settled promises anyhow, and our wrapper has stored the
- * results.
- */
- try {
- await Promise.race(this.inFlight);
- } catch (e) {
- // NOP here, as we'll handle it when we scan the list
- }
- this.logger.debug(_scope, { msg: 'race completed' });
-
- // Address settled promises..
- const settled = this._handleWatchedList(this._watchedHandler.bind(this));
- this.logger.debug(_scope, { settled });
-
- // Try to fill the vacancy
- // TODO: maybe rate-limit this call based on slot availability
- await this._getWork();
+ // Share one db connection for all tasks.
+ try {
+ await this.db.context(async (dbCtx) => {
+
+ // Try to fill the hopper
+ await this._getWork(dbCtx);
+
+ while (this.inFlight.length > 0) {
+ /* Wait for one or more to be resolved.
+ * We don't care what the result was, as we have to scan the list
+ * for all settled promises anyhow, and our wrapper has stored the
+ * results.
+ */
+ try {
+ await Promise.race(this.inFlight);
+ } catch (e) {
+ // NOP here, as we'll handle it when we scan the list
+ }
+ this.logger.debug(_scope, { msg: 'race completed' });
+
+ // Address settled promises..
+ const settled = this._handleWatchedList(this._watchedHandler.bind(this));
+ this.logger.debug(_scope, { settled });
+
+ // Try to fill the vacancy
+ // TODO: maybe rate-limit this call based on slot availability
+ await this._getWork(dbCtx);
+ }
+ }); // dbCtx
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ // Try again later anyhow.
}
// No more work, wait a while and retry
}); // verificationClaimAndProcessById
describe('workFeed', function () {
- let wanted;
+ let stubCtx, wanted;
beforeEach(function () {
+ stubCtx = {};
sinon.stub(communication, 'topicFetchProcess');
sinon.stub(communication, 'verificationProcess');
sinon.stub(communication, 'subscriptionDeliveryProcess');
const expectedLength = [topicIds, verificationIds, subscriptionIds].map((x) => x.length).reduce((a, b) => a + b, 0);
wanted = 10;
- const result = await communication.workFeed(wanted);
+ const result = await communication.workFeed(stubCtx, wanted);
assert.strictEqual(result.length, expectedLength);
});
it('covers no wanted work', async function () {
- const result = await communication.workFeed(0);
+ wanted = 0;
+ const result = await communication.workFeed(stubCtx, wanted);
assert.strictEqual(result.length, 0);
assert(!communication.db.topicFetchClaim.called);
assert(!communication.db.verificationClaim.called);
const expectedLength = topicIds.length;
wanted = 10;
- const result = await communication.workFeed(wanted);
+ const result = await communication.workFeed(stubCtx, wanted);
assert.strictEqual(result.length, expectedLength);
});
const Config = require('../../config');
const stubLogger = require('../stub-logger');
+const stubDb = require('../stub-db');
const noExpectedException = 'did not get expected exception';
beforeEach(function () {
config = new Config('test');
promiseGiver = sinon.stub();
- worker = new Worker(stubLogger, promiseGiver, config);
+ worker = new Worker(stubLogger, stubDb, promiseGiver, config);
+ stubDb._reset();
});
afterEach(function () {
it('requires a promiseGiver function', function () {
try {
- worker = new Worker(stubLogger, undefined, config);
+ worker = new Worker(stubLogger, stubDb, undefined, config);
assert.fail('should require function argument');
} catch (e) {
assert(e instanceof TypeError);
describe('start', function () {
it('starts without polling', function () {
config.worker.pollingEnabled = false;
- worker = new Worker(stubLogger, promiseGiver, config);
+ worker = new Worker(stubLogger, stubDb, promiseGiver, config);
worker.start();
assert.strictEqual(worker.running, false);
});
it('starts with polling', function () {
config.worker.pollingEnabled = true;
- worker = new Worker(stubLogger, promiseGiver, config);
+ worker = new Worker(stubLogger, stubDb, promiseGiver, config);
sinon.stub(worker, '_recurr');
worker.start();
clearTimeout(worker.nextTimeout);
describe('stop', function () {
it('stops', function () {
- worker = new Worker(stubLogger, promiseGiver, config);
+ worker = new Worker(stubLogger, stubDb, promiseGiver, config);
worker.start();
worker.stop();
assert.strictEqual(worker.running, false);
}); // _handleWatchedList
describe('_getWork', function () {
+ let stubCtx;
+ beforeEach(function () {
+ stubCtx = {};
+ });
it('gets tasks', async function () {
const expected = [
Promise.resolve('first'),
Promise.resolve('second'),
];
worker.promiseGiver.resolves(expected);
- const result = await worker._getWork();
+ const result = await worker._getWork(stubCtx);
assert.deepStrictEqual(result, expected);
assert.strictEqual(worker.inFlight.length, expected.length);
});
Promise.reject('bad'),
Promise.resolve('second'),
];
- const result = await worker._getWork();
+ const result = await worker._getWork(stubCtx);
assert(!worker.promiseGiver.called);
assert.deepStrictEqual(result, []);
});
assert.strictEqual(worker._getWork.callCount, 1);
assert.strictEqual(worker._recurr.callCount, 1);
});
+ it('covers error', async function () {
+ const expected = new Error('blah');
+ stubDb.context.restore();
+ sinon.stub(stubDb, 'context').rejects(expected);
+ await worker.process();
+ assert.strictEqual(worker._getWork.callCount, 0);
+ assert.strictEqual(worker._recurr.callCount, 1);
+ });
}); // process
}); // Worker