worker tasks now share one db context, rather than one per task
authorJustin Wind <justin.wind+git@gmail.com>
Fri, 13 Aug 2021 18:57:20 +0000 (11:57 -0700)
committerJustin Wind <justin.wind+git@gmail.com>
Fri, 13 Aug 2021 18:57:20 +0000 (11:57 -0700)
When using Postgres, each context is a connection.  Moved the context
creation into the worker so that all tasks can use one connection when
its processing, which is much healthier for the database.

CHANGELOG.md
README.md
src/communication.js
src/worker.js
test/src/communication.js
test/src/worker.js

index 71d4dfe2fcfe0de39d0b1c40af8db594ef80ae67..7e97c9cd5e83a4f1088c7f63f9d387abf96014bd 100644 (file)
@@ -4,6 +4,10 @@ Releases and notable changes to this project are documented here.
 
 ## [Unreleased]
 
+### Fixed
+
+- Worker tasks, such as delivering content updates, now share one database context.  This reduces the connection load for Postgres backends, affording greater scalability.
+
 ## [v1.1.2] - 2021-08-11
 
 ### Added
index 32ca1bb949856717bb4e5a1e1b69b441d3800738..c7e805a64faf856245c3896ce23ffcdf188729cb 100644 (file)
--- a/README.md
+++ b/README.md
@@ -135,6 +135,7 @@ This implementation is built atop an in-house API framework, for Reasons.  It wo
     - schema-version-helper.js - schema migrations aide
     - postgres/
       - index.js - PostgreSQL implementation
+      - listener.js - notify/listen connection to support topic content caching
       - sql/ - statements and schemas
     - sqlite/
       - index.js - SQLite implementation
index 72f2642a003e465aab42909ce13f90b8fd93694c..b67e2c731ff122e09f4e68557a8ddf0b1dec927c 100644 (file)
@@ -49,7 +49,7 @@ class Communication {
       return response;
     });
 
-    this.worker = new Worker(logger, this.workFeed.bind(this), options);
+    this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
     this.worker.start();
   }
 
@@ -647,10 +647,11 @@ class Communication {
 
   /**
    * 
+   * @param {*} dbCtx
    * @param {Number} wanted maximum tasks to claim
    * @returns {Promise<void>[]}
    */
-  async workFeed(wanted) {
+  async workFeed(dbCtx, wanted) {
     const _scope = _fileScope('workFeed');
     const inProgress = [];
     const requestId = common.requestId();
@@ -661,33 +662,29 @@ class Communication {
     this.logger.debug(_scope, 'called', { wanted });
 
     try {
-      await this.db.context(async (dbCtx) => {
-        if (wanted > 0) {
-          // Update topics before anything else.
-          const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-          // Each task gets a new context, as these map to connections in some dbs.
-          // This dbCtx goes away after launching the processing functions, so would not be available to tasks.
-          topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
-          inProgress.push(...topicFetchPromises);
-          wanted -= topicFetchPromises.length;
-        }
+      if (wanted > 0) {
+        // Update topics before anything else.
+        const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+        topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+        inProgress.push(...topicFetchPromises);
+        wanted -= topicFetchPromises.length;
+      }
 
-        if (wanted > 0) {
-          // Then any pending verifications.
-          const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-          verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
-          inProgress.push(...verificationPromises);
-          wanted -= verificationPromises.length;
-        }
+      if (wanted > 0) {
+        // Then any pending verifications.
+        const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+        verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+        inProgress.push(...verificationPromises);
+        wanted -= verificationPromises.length;
+      }
 
-        if (wanted > 0) {
-          // Finally dole out content.
-          const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-          updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
-          inProgress.push(...updatePromises);
-          wanted -= updatePromises.length;
-        }
-      }); // dbCtx
+      if (wanted > 0) {
+        // Finally dole out content.
+        const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+        updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+        inProgress.push(...updatePromises);
+        wanted -= updatePromises.length;
+      }
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e });
       // do not re-throw, return what we've claimed so far
index ca77369aa3646fa871547bc67f8bd5c49e65b6d4..9b6fd7ab882752f00dc76ae693db8980e8eadc76 100644 (file)
@@ -12,22 +12,25 @@ const _fileScope = common.fileScope(__filename);
 
 /**
  * @callback Worker~promiseGiver
+ * @param {*} dbCtx
  * @param {number} atMost
  * @returns {Promise<void>[]}
  */
 
 class Worker {
   /**
-   * @param {object} logger 
-   * @param {Worker~promiseGiver} promiseGiver 
+   * @param {object} logger
+   * @param {object} db
+   * @param {Worker~promiseGiver} promiseGiver
    * @param {object} options
    * @param {object} options.worker
    * @param {object} options.worker.pollingEnabled
    * @param {number} options.worker.recurrSleepMs
    * @param {number} options.worker.concurrency
    */
-  constructor(logger, promiseGiver, options) {
+  constructor(logger, db, promiseGiver, options) {
     this.logger = logger;
+    this.db = db;
     this.options = options;
     if (!promiseGiver || typeof promiseGiver !== 'function') {
       throw new TypeError('function required');
@@ -129,14 +132,15 @@ class Worker {
 
   /**
    * Refill the workpool with our special promises.
+   * @param {*} dbCtx
    * @returns {Promise[]}
    */
-  async _getWork() {
+  async _getWork(dbCtx) {
     const _scope = _fileScope('_getWork');
     let newPromises = [];
     const wanted = this.concurrency - this.inFlight.length;
     if (wanted > 0) {
-      newPromises = await this.promiseGiver(wanted);
+      newPromises = await this.promiseGiver(dbCtx, wanted);
       newPromises = newPromises.map((p) => Worker.watchedPromise(p));
       common.stackSafePush(this.inFlight, newPromises);
     }
@@ -178,29 +182,38 @@ class Worker {
     // Interrupt any pending sleep, if we were called out of timeout-cycle.
     clearTimeout(this.nextTimeout);
 
-    // Try to fill the hopper
-    await this._getWork();
-
-    while (this.inFlight.length > 0) {
-      /* Wait for one or more to be resolved.
-       * We don't care what the result was, as we have to scan the list
-       * for all settled promises anyhow, and our wrapper has stored the
-       * results.
-       */
-      try {
-        await Promise.race(this.inFlight);
-      } catch (e) {
-        // NOP here, as we'll handle it when we scan the list
-      }
-      this.logger.debug(_scope, { msg: 'race completed' });
-
-      // Address settled promises..
-      const settled = this._handleWatchedList(this._watchedHandler.bind(this));
-      this.logger.debug(_scope, { settled });
-      
-      // Try to fill the vacancy
-      // TODO: maybe rate-limit this call based on slot availability
-      await this._getWork();
+    // Share one db connection for all tasks.
+    try {
+      await this.db.context(async (dbCtx) => {
+
+        // Try to fill the hopper
+        await this._getWork(dbCtx);
+
+        while (this.inFlight.length > 0) {
+          /* Wait for one or more to be resolved.
+          * We don't care what the result was, as we have to scan the list
+          * for all settled promises anyhow, and our wrapper has stored the
+          * results.
+          */
+          try {
+            await Promise.race(this.inFlight);
+          } catch (e) {
+            // NOP here, as we'll handle it when we scan the list
+          }
+          this.logger.debug(_scope, { msg: 'race completed' });
+
+          // Address settled promises..
+          const settled = this._handleWatchedList(this._watchedHandler.bind(this));
+          this.logger.debug(_scope, { settled });
+
+          // Try to fill the vacancy
+          // TODO: maybe rate-limit this call based on slot availability
+          await this._getWork(dbCtx);
+        }
+      }); // dbCtx
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      // Try again later anyhow.
     }
 
     // No more work, wait a while and retry
index 7c622274a8dfa0d00e2350c168bc356559d6eb24..ca5f34e92c536458171c18fa8cd4f78976a5bf11 100644 (file)
@@ -837,8 +837,9 @@ describe('Communication', function () {
   }); // verificationClaimAndProcessById
 
   describe('workFeed', function () {
-    let wanted;
+    let stubCtx, wanted;
     beforeEach(function () {
+      stubCtx = {};
       sinon.stub(communication, 'topicFetchProcess');
       sinon.stub(communication, 'verificationProcess');
       sinon.stub(communication, 'subscriptionDeliveryProcess');
@@ -853,12 +854,13 @@ describe('Communication', function () {
       const expectedLength = [topicIds, verificationIds, subscriptionIds].map((x) => x.length).reduce((a, b) => a + b, 0);
       wanted = 10;
 
-      const result = await communication.workFeed(wanted);
+      const result = await communication.workFeed(stubCtx, wanted);
 
       assert.strictEqual(result.length, expectedLength);
     });
     it('covers no wanted work', async function () {
-      const result = await communication.workFeed(0);
+      wanted = 0;
+      const result = await communication.workFeed(stubCtx, wanted);
       assert.strictEqual(result.length, 0);
       assert(!communication.db.topicFetchClaim.called);
       assert(!communication.db.verificationClaim.called);
@@ -871,7 +873,7 @@ describe('Communication', function () {
       const expectedLength = topicIds.length;
       wanted = 10;
 
-      const result = await communication.workFeed(wanted);
+      const result = await communication.workFeed(stubCtx, wanted);
 
       assert.strictEqual(result.length, expectedLength);
     });
index a9047a0e0ccef3e5013a136cf437dab8c34786cc..32bd0656665bd854933240eb5323a440e1e438c5 100644 (file)
@@ -8,6 +8,7 @@ const Worker = require('../../src/worker');
 const Config = require('../../config');
 
 const stubLogger = require('../stub-logger');
+const stubDb = require('../stub-db');
 
 const noExpectedException = 'did not get expected exception';
 
@@ -19,7 +20,8 @@ describe('Worker', function () {
   beforeEach(function () {
     config = new Config('test');
     promiseGiver = sinon.stub();
-    worker = new Worker(stubLogger, promiseGiver, config);
+    worker = new Worker(stubLogger, stubDb, promiseGiver, config);
+    stubDb._reset();
   });
 
   afterEach(function () {
@@ -33,7 +35,7 @@ describe('Worker', function () {
   
     it('requires a promiseGiver function', function () {
       try {
-        worker = new Worker(stubLogger, undefined, config);
+        worker = new Worker(stubLogger, stubDb, undefined, config);
         assert.fail('should require function argument');
       } catch (e) {
         assert(e instanceof TypeError);
@@ -44,13 +46,13 @@ describe('Worker', function () {
   describe('start', function () {
     it('starts without polling', function () {
       config.worker.pollingEnabled = false;
-      worker = new Worker(stubLogger, promiseGiver, config);
+      worker = new Worker(stubLogger, stubDb, promiseGiver, config);
       worker.start();
       assert.strictEqual(worker.running, false);
     });
     it('starts with polling', function () {
       config.worker.pollingEnabled = true;
-      worker = new Worker(stubLogger, promiseGiver, config);
+      worker = new Worker(stubLogger, stubDb, promiseGiver, config);
       sinon.stub(worker, '_recurr');
       worker.start();
       clearTimeout(worker.nextTimeout);
@@ -60,7 +62,7 @@ describe('Worker', function () {
 
   describe('stop', function () {
     it('stops', function () {
-      worker = new Worker(stubLogger, promiseGiver, config);
+      worker = new Worker(stubLogger, stubDb, promiseGiver, config);
       worker.start();
       worker.stop();
       assert.strictEqual(worker.running, false);
@@ -124,6 +126,10 @@ describe('Worker', function () {
   }); // _handleWatchedList
 
   describe('_getWork', function () {
+    let stubCtx;
+    beforeEach(function () {
+      stubCtx = {};
+    });
     it('gets tasks', async function () {
       const expected = [
         Promise.resolve('first'),
@@ -131,7 +137,7 @@ describe('Worker', function () {
         Promise.resolve('second'),
       ];
       worker.promiseGiver.resolves(expected);
-      const result = await worker._getWork();
+      const result = await worker._getWork(stubCtx);
       assert.deepStrictEqual(result, expected);
       assert.strictEqual(worker.inFlight.length, expected.length);
     });
@@ -142,7 +148,7 @@ describe('Worker', function () {
         Promise.reject('bad'),
         Promise.resolve('second'),
       ];
-      const result = await worker._getWork();
+      const result = await worker._getWork(stubCtx);
       assert(!worker.promiseGiver.called);
       assert.deepStrictEqual(result, []);
     });
@@ -190,6 +196,14 @@ describe('Worker', function () {
       assert.strictEqual(worker._getWork.callCount, 1);
       assert.strictEqual(worker._recurr.callCount, 1);
     });
+    it('covers error', async function () {
+      const expected = new Error('blah');
+      stubDb.context.restore();
+      sinon.stub(stubDb, 'context').rejects(expected);
+      await worker.process();
+      assert.strictEqual(worker._getWork.callCount, 0);
+      assert.strictEqual(worker._recurr.callCount, 1);
+    });
   }); // process
 
 }); // Worker