ignore invocation of worker process if already running
authorJustin Wind <justin.wind+git@gmail.com>
Mon, 16 Aug 2021 18:38:48 +0000 (11:38 -0700)
committerJustin Wind <justin.wind+git@gmail.com>
Mon, 16 Aug 2021 18:40:13 +0000 (11:40 -0700)
also catch any errors thrown when invoking by admin api

CHANGELOG.md
src/manager.js
src/worker.js
test/src/manager.js
test/src/worker.js

index 0378b17264c01b6b390e76aba426144260f4416d..05ed31dd2cb52ca7cc1be1a74af526f0bf5117ff 100644 (file)
@@ -4,6 +4,10 @@ Releases and notable changes to this project are documented here.
 
 ## [Unreleased]
 
+### Fixed
+
+- Prevent task processor from being re-invoked if it is already running.
+
 ## [v1.1.3] - 2021-08-13
 
 ### Fixed
index d7a07f117b9eae96aebb808382625d237409732f..fddc8e45ae6f272a346c2131a05d909c190d03d9 100644 (file)
@@ -666,7 +666,9 @@ class Manager {
     this.logger.debug(_scope, 'called', { ctx });
 
     // N.B. no await on this
-    this.communication.worker.process();
+    this.communication.worker.process().catch((e) => {
+      this.logger.error(_scope, 'failed', { error: e, ctx });
+    });
 
     res.end();
     this.logger.info(_scope, 'invoked worker process', { ctx });
index 9b6fd7ab882752f00dc76ae693db8980e8eadc76..4f566ad9f97209bcc538692cf87b6e412ccbe877 100644 (file)
@@ -42,6 +42,7 @@ class Worker {
     this.inFlight = []; // Our work heap of Promises  
     this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
     this.running = false;
+    this.isProcessing = false; // Only let one process() method execute on the work heap at a time
   }
 
   /**
@@ -177,7 +178,13 @@ class Worker {
   async process() {
     const _scope = _fileScope('process');
 
-    this.logger.debug(_scope, 'called', {});
+    this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing });
+
+
+    if (this.isProcessing) {
+      return;
+    }
+    this.isProcessing = true;
 
     // Interrupt any pending sleep, if we were called out of timeout-cycle.
     clearTimeout(this.nextTimeout);
@@ -218,6 +225,8 @@ class Worker {
 
     // No more work, wait a while and retry
     this._recurr();
+
+    this.isProcessing = false;
   }
 
 }
index d8c09219c2cf323ecea93917013f597573588128..7870a55b45b62df6d4a8c3192501467c20b1e5c1 100644 (file)
@@ -609,7 +609,13 @@ describe('Manager', function () {
 
   describe('processTasks', function () {
     it('covers', async function () {
-      sinon.stub(manager.communication.worker, 'process');
+      sinon.stub(manager.communication.worker, 'process').resolves();
+      await manager.processTasks(res, ctx);
+      assert(manager.communication.worker.process.called);
+      assert(res.end.called);
+    });
+    it('covers error', async function () {
+      sinon.stub(manager.communication.worker, 'process').rejects();
       await manager.processTasks(res, ctx);
       assert(manager.communication.worker.process.called);
       assert(res.end.called);
index 32bd0656665bd854933240eb5323a440e1e438c5..5c97c0b24dd33de7dabdceb02ed298320d971188 100644 (file)
@@ -204,6 +204,18 @@ describe('Worker', function () {
       assert.strictEqual(worker._getWork.callCount, 0);
       assert.strictEqual(worker._recurr.callCount, 1);
     });
+    it('covers double invocation', async function () {
+      const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+
+      this.slow(300);
+      worker.inFlight = [
+        Worker.watchedPromise(snooze(100)),
+      ];
+
+      await Promise.all([worker.process(), worker.process()]);
+      assert.strictEqual(worker._getWork.callCount, 2);
+      assert.strictEqual(worker._recurr.callCount, 1);
+    });
   }); // process
 
 }); // Worker