From fe3a4d96d559edcc30c2047109cd7b027e6258eb Mon Sep 17 00:00:00 2001 From: Justin Wind Date: Mon, 16 Aug 2021 11:38:48 -0700 Subject: [PATCH] ignore invocation of worker process if already running also catch any errors thrown when invoking by admin api --- CHANGELOG.md | 4 ++++ src/manager.js | 4 +++- src/worker.js | 11 ++++++++++- test/src/manager.js | 8 +++++++- test/src/worker.js | 12 ++++++++++++ 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0378b17..05ed31d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ Releases and notable changes to this project are documented here. ## [Unreleased] +### Fixed + +- Prevent task processor from being re-invoked if it is already running. + ## [v1.1.3] - 2021-08-13 ### Fixed diff --git a/src/manager.js b/src/manager.js index d7a07f1..fddc8e4 100644 --- a/src/manager.js +++ b/src/manager.js @@ -666,7 +666,9 @@ class Manager { this.logger.debug(_scope, 'called', { ctx }); // N.B. no await on this - this.communication.worker.process(); + this.communication.worker.process().catch((e) => { + this.logger.error(_scope, 'failed', { error: e, ctx }); + }); res.end(); this.logger.info(_scope, 'invoked worker process', { ctx }); diff --git a/src/worker.js b/src/worker.js index 9b6fd7a..4f566ad 100644 --- a/src/worker.js +++ b/src/worker.js @@ -42,6 +42,7 @@ class Worker { this.inFlight = []; // Our work heap of Promises this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period. this.running = false; + this.isProcessing = false; // Only let one process() method execute on the work heap at a time } /** @@ -177,7 +178,13 @@ class Worker { async process() { const _scope = _fileScope('process'); - this.logger.debug(_scope, 'called', {}); + this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing }); + + + if (this.isProcessing) { + return; + } + this.isProcessing = true; // Interrupt any pending sleep, if we were called out of timeout-cycle. clearTimeout(this.nextTimeout); @@ -218,6 +225,8 @@ class Worker { // No more work, wait a while and retry this._recurr(); + + this.isProcessing = false; } } diff --git a/test/src/manager.js b/test/src/manager.js index d8c0921..7870a55 100644 --- a/test/src/manager.js +++ b/test/src/manager.js @@ -609,7 +609,13 @@ describe('Manager', function () { describe('processTasks', function () { it('covers', async function () { - sinon.stub(manager.communication.worker, 'process'); + sinon.stub(manager.communication.worker, 'process').resolves(); + await manager.processTasks(res, ctx); + assert(manager.communication.worker.process.called); + assert(res.end.called); + }); + it('covers error', async function () { + sinon.stub(manager.communication.worker, 'process').rejects(); await manager.processTasks(res, ctx); assert(manager.communication.worker.process.called); assert(res.end.called); diff --git a/test/src/worker.js b/test/src/worker.js index 32bd065..5c97c0b 100644 --- a/test/src/worker.js +++ b/test/src/worker.js @@ -204,6 +204,18 @@ describe('Worker', function () { assert.strictEqual(worker._getWork.callCount, 0); assert.strictEqual(worker._recurr.callCount, 1); }); + it('covers double invocation', async function () { + const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + + this.slow(300); + worker.inFlight = [ + Worker.watchedPromise(snooze(100)), + ]; + + await Promise.all([worker.process(), worker.process()]); + assert.strictEqual(worker._getWork.callCount, 2); + assert.strictEqual(worker._recurr.callCount, 1); + }); }); // process }); // Worker -- 2.45.2