## [Unreleased]
+### Fixed
+
+- Prevent task processor from being re-invoked if it is already running.
+
## [v1.1.3] - 2021-08-13
### Fixed
this.logger.debug(_scope, 'called', { ctx });
// N.B. no await on this
- this.communication.worker.process();
+ this.communication.worker.process().catch((e) => {
+ this.logger.error(_scope, 'failed', { error: e, ctx });
+ });
res.end();
this.logger.info(_scope, 'invoked worker process', { ctx });
this.inFlight = []; // Our work heap of Promises
this.nextTimeout = undefined; // Allow clearTimeout() to reset waiting period.
this.running = false;
+ this.isProcessing = false; // Only let one process() method execute on the work heap at a time
}
/**
async process() {
const _scope = _fileScope('process');
- this.logger.debug(_scope, 'called', {});
+ this.logger.debug(_scope, 'called', { isProcessing: this.isProcessing });
+
+
+ if (this.isProcessing) {
+ return;
+ }
+ this.isProcessing = true;
// Interrupt any pending sleep, if we were called out of timeout-cycle.
clearTimeout(this.nextTimeout);
// No more work, wait a while and retry
this._recurr();
+
+ this.isProcessing = false;
}
}
describe('processTasks', function () {
it('covers', async function () {
- sinon.stub(manager.communication.worker, 'process');
+ sinon.stub(manager.communication.worker, 'process').resolves();
+ await manager.processTasks(res, ctx);
+ assert(manager.communication.worker.process.called);
+ assert(res.end.called);
+ });
+ it('covers error', async function () {
+ sinon.stub(manager.communication.worker, 'process').rejects();
await manager.processTasks(res, ctx);
assert(manager.communication.worker.process.called);
assert(res.end.called);
assert.strictEqual(worker._getWork.callCount, 0);
assert.strictEqual(worker._recurr.callCount, 1);
});
+ it('covers double invocation', async function () {
+ const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+
+ this.slow(300);
+ worker.inFlight = [
+ Worker.watchedPromise(snooze(100)),
+ ];
+
+ await Promise.all([worker.process(), worker.process()]);
+ assert.strictEqual(worker._getWork.callCount, 2);
+ assert.strictEqual(worker._recurr.callCount, 1);
+ });
}); // process
}); // Worker