ignore invocation of worker process if already running
[websub-hub] / test / src / worker.js
1 /* eslint-env mocha */
2 'use strict';
3
4 const assert = require('assert');
5 const sinon = require('sinon'); // eslint-disable-line node/no-unpublished-require
6
7 const Worker = require('../../src/worker');
8 const Config = require('../../config');
9
10 const stubLogger = require('../stub-logger');
11 const stubDb = require('../stub-db');
12
13 const noExpectedException = 'did not get expected exception';
14
15 describe('Worker', function () {
16 let config;
17 let worker;
18 let promiseGiver;
19
20 beforeEach(function () {
21 config = new Config('test');
22 promiseGiver = sinon.stub();
23 worker = new Worker(stubLogger, stubDb, promiseGiver, config);
24 stubDb._reset();
25 });
26
27 afterEach(function () {
28 sinon.restore();
29 });
30
31 describe('constructor', function () {
32 it('instantiates', function () {
33 assert(worker);
34 });
35
36 it('requires a promiseGiver function', function () {
37 try {
38 worker = new Worker(stubLogger, stubDb, undefined, config);
39 assert.fail('should require function argument');
40 } catch (e) {
41 assert(e instanceof TypeError);
42 }
43 });
44 }); // constructor
45
46 describe('start', function () {
47 it('starts without polling', function () {
48 config.worker.pollingEnabled = false;
49 worker = new Worker(stubLogger, stubDb, promiseGiver, config);
50 worker.start();
51 assert.strictEqual(worker.running, false);
52 });
53 it('starts with polling', function () {
54 config.worker.pollingEnabled = true;
55 worker = new Worker(stubLogger, stubDb, promiseGiver, config);
56 sinon.stub(worker, '_recurr');
57 worker.start();
58 clearTimeout(worker.nextTimeout);
59 assert.strictEqual(worker.running, true);
60 });
61 }); // start
62
63 describe('stop', function () {
64 it('stops', function () {
65 worker = new Worker(stubLogger, stubDb, promiseGiver, config);
66 worker.start();
67 worker.stop();
68 assert.strictEqual(worker.running, false);
69 assert.strictEqual(worker.nextTimeout, undefined);
70 });
71 }); // stop
72
73 describe('watchedPromise', function () {
74 let promise;
75 it('watches a resolvable promise', async function () {
76 const res = 'yay';
77 promise = Promise.resolve(res);
78 const watched = Worker.watchedPromise(promise);
79 const result = await watched;
80 assert.strictEqual(result, res);
81 assert.strictEqual(watched.resolved, res);
82 assert(watched.isSettled);
83 });
84 it('watches a rejectable promise', async function () {
85 const rej = new Error('boo');
86 promise = Promise.reject(rej);
87 const watched = Worker.watchedPromise(promise);
88 try {
89 await watched;
90 assert.fail(noExpectedException);
91 } catch (e) {
92 assert.deepStrictEqual(e, rej);
93 assert.deepStrictEqual(watched.rejected, rej);
94 assert(watched.isSettled);
95 }
96 });
97 it('covers wrapped promise', async function () {
98 const res = 'yay';
99 promise = Promise.resolve(res);
100 const watched = Worker.watchedPromise(promise);
101 const rewatched = Worker.watchedPromise(watched);
102 const result = await rewatched;
103 assert.strictEqual(result, res);
104 assert.strictEqual(rewatched.resolved, res);
105 assert(rewatched.isSettled);
106 });
107 }); // watchedPromise
108
109 describe('_handleWatchedList', function () {
110 let handler;
111 beforeEach(function () {
112 handler = sinon.stub();
113 });
114 it('handled resolveds', function () {
115 worker.inFlight = [
116 { isSettled: false, resolved: undefined, rejected: undefined },
117 { isSettled: true, resolved: 'value', rejected: undefined },
118 { isSettled: true, resolved: undefined, rejected: 'error' },
119 { isSettled: false, resolved: undefined, rejected: undefined },
120 ];
121 const result = worker._handleWatchedList(handler);
122 assert.strictEqual(result, 2);
123 assert.strictEqual(worker.inFlight.length, 2);
124 assert.strictEqual(handler.callCount, 2);
125 });
126 }); // _handleWatchedList
127
128 describe('_getWork', function () {
129 let stubCtx;
130 beforeEach(function () {
131 stubCtx = {};
132 });
133 it('gets tasks', async function () {
134 const expected = [
135 Promise.resolve('first'),
136 Promise.reject('bad'),
137 Promise.resolve('second'),
138 ];
139 worker.promiseGiver.resolves(expected);
140 const result = await worker._getWork(stubCtx);
141 assert.deepStrictEqual(result, expected);
142 assert.strictEqual(worker.inFlight.length, expected.length);
143 });
144 it('covers none wanted', async function () {
145 worker.concurrency = 3;
146 worker.inFlight = [
147 Promise.resolve('first'),
148 Promise.reject('bad'),
149 Promise.resolve('second'),
150 ];
151 const result = await worker._getWork(stubCtx);
152 assert(!worker.promiseGiver.called);
153 assert.deepStrictEqual(result, []);
154 });
155 }); // _getWork
156
157 describe('_watchedHandler', function () {
158 it('covers resolved', function () {
159 worker._watchedHandler('resolved', undefined);
160 });
161 it('covers rejected', function () {
162 worker._watchedHandler(undefined, 'rejected');
163 });
164 }); // _watchedHandler
165
166 describe('_recurr', function () {
167 it('covers', function (done) {
168 worker.recurrSleepMs = 10;
169 this.slow(worker.recurrSleepMs * 3);
170 sinon.stub(worker, 'process').callsFake(done);
171 worker.running = true;
172 worker._recurr();
173 });
174 it('covers not running', function () {
175 worker.running = false;
176 worker._recurr();
177 });
178 }); // _recurr
179
180 describe('process', function () {
181 beforeEach(function () {
182 sinon.stub(worker, '_getWork');
183 sinon.stub(worker, '_recurr');
184 });
185 it('covers', async function () {
186 worker.inFlight = [
187 Worker.watchedPromise(Promise.resolve('one')),
188 Worker.watchedPromise(Promise.reject('foo')),
189 ];
190 await worker.process();
191 assert.strictEqual(worker._getWork.callCount, 2);
192 assert.strictEqual(worker._recurr.callCount, 1);
193 });
194 it('covers no work', async function () {
195 await worker.process();
196 assert.strictEqual(worker._getWork.callCount, 1);
197 assert.strictEqual(worker._recurr.callCount, 1);
198 });
199 it('covers error', async function () {
200 const expected = new Error('blah');
201 stubDb.context.restore();
202 sinon.stub(stubDb, 'context').rejects(expected);
203 await worker.process();
204 assert.strictEqual(worker._getWork.callCount, 0);
205 assert.strictEqual(worker._recurr.callCount, 1);
206 });
207 it('covers double invocation', async function () {
208 const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));
209
210 this.slow(300);
211 worker.inFlight = [
212 Worker.watchedPromise(snooze(100)),
213 ];
214
215 await Promise.all([worker.process(), worker.process()]);
216 assert.strictEqual(worker._getWork.callCount, 2);
217 assert.strictEqual(worker._recurr.callCount, 1);
218 });
219 }); // process
220
221 }); // Worker