initial commit
[squeep-amqp-helper] / test / lib / base.js
1 /* eslint-env mocha */
2 /* eslint-disable node/no-unpublished-require */
3 'use strict';
4 const assert = require('assert');
5 const sinon = require('sinon');
6 const Base = require('../../lib/base');
7 const amqp = require('amqplib');
8 const { StubLogger } = require('@squeep/test-helper');
9
10 describe('Base', function () {
11 let logger, options, base;
12 const expectedException = new Error('oh no');
13
14 beforeEach(function () {
15 logger = new StubLogger()
16 logger._reset();
17 options = {
18 amqp: {
19 url: 'amqp://user:password@rabbitmq.int:5672',
20 },
21 };
22 sinon.stub(amqp, 'connect').resolves({
23 createConfirmChannel: sinon.stub().resolves({
24 on: sinon.stub(),
25 checkQueue: sinon.stub(),
26 assertExchange: sinon.stub(),
27 assertQueue: sinon.stub(),
28 bindQueue: sinon.stub(),
29 prefetch: sinon.stub(),
30 consume: sinon.stub(),
31 recover: sinon.stub(),
32 close: sinon.stub(),
33 }),
34 connection: {
35 stream: {
36 writable: true,
37 },
38 },
39 on: sinon.stub(),
40 close: sinon.stub(),
41 });
42 base = new Base(logger, options);
43 });
44
45 afterEach(function () {
46 sinon.restore();
47 });
48
49 describe('connect', function () {
50 beforeEach(function () {
51 sinon.stub(base, '_connectAMQP');
52 sinon.stub(base, 'close');
53 });
54 it('covers success', async function () {
55 await base.connect();
56 assert(base._connectAMQP.called);
57 });
58 it('covers failure', async function () {
59 base._connectAMQP.rejects(expectedException);
60 await assert.rejects(base.connect(), expectedException);
61 assert(base._connectAMQP.called);
62 assert(base.close.called);
63 assert(base.logger.error.called);
64 });
65 }); // connect
66
67 describe('_connectAMQP', function () {
68 beforeEach(function () {
69 sinon.stub(base, '_establishAMQPConnection');
70 sinon.stub(base, '_establishAMQPChannel');
71 });
72 it('covers', async function () {
73 await base._connectAMQP();
74 assert(base._establishAMQPConnection.called);
75 assert(base._establishAMQPChannel.called);
76 });
77 }); // _connectAMQP
78
79 describe('_establishAMQPConnection', function () {
80 it('covers success', async function () {
81 await base._establishAMQPConnection();
82 assert(amqp.connect.called);
83 assert(base.connection.on.called);
84 });
85 it('covers failure', async function () {
86 amqp.connect.rejects(expectedException);
87 await assert.rejects(base._establishAMQPConnection(), expectedException);
88 });
89 }); // _establishAMQPConnection
90
91 describe('_establishAMQPChannel', function () {
92 beforeEach(async function () {
93 await base._establishAMQPConnection();
94 });
95 it('covers success', async function () {
96 await base._establishAMQPChannel();
97 assert(base.connection.createConfirmChannel.called);
98 assert(base.channel.on.called);
99 });
100 it('covers failure', async function () {
101 base.connection.createConfirmChannel.rejects(expectedException);
102 await assert.rejects(base._establishAMQPChannel(), expectedException);
103 });
104 }); // _establishAMQPChannel
105
106 describe('AMQP Event Handlers', function () {
107 let fatalErr, nonFatalErr;
108 beforeEach(function () {
109 fatalErr = {
110 code: 501,
111 };
112 nonFatalErr = {
113 code: 200,
114 };
115 base.connection = {};
116 base.channel = {};
117 base.queueConsumerTags = { 'consumer': 'placeholder' };
118 });
119 describe('_eventAMQPConnectionClose', function () {
120 it('fatal behavior', function () {
121 base._eventAMQPConnectionClose(fatalErr);
122 assert.strictEqual(base.connection, undefined);
123 assert.strictEqual(base.channel, undefined);
124 assert(base.logger.error.called);
125 });
126 it('non-fatal behavior', function () {
127 base._eventAMQPConnectionClose(nonFatalErr);
128 assert.strictEqual(base.connection, undefined);
129 assert.strictEqual(base.channel, undefined);
130 assert(base.logger.debug.called);
131 });
132 }); // _eventAMQPConnectionClose
133
134 describe('_eventAMQPConnectionError', function () {
135 it('logs event', function () {
136 base._eventAMQPConnectionError(fatalErr);
137 assert(base.logger.error.called);
138 });
139 }); // _eventAMQPConnectionError
140
141 describe('_eventAMQPConnectionBlocked', function () {
142 it('logs event', function () {
143 const reason = 'because';
144 base._eventAMQPConnectionBlocked(reason);
145 assert(base.logger.debug.called);
146 });
147 }); // _eventAMQPConnectionBlocked
148
149 describe('_eventAMQPConnectionUnblocked', function () {
150 it('logs event', function () {
151 base._eventAMQPConnectionUnblocked();
152 assert(base.logger.debug.called);
153 });
154 }); // _eventAMQPConnectionUnblocked
155
156 describe('_eventAMQPChannelClose', function () {
157 it('logs event', function () {
158 base._eventAMQPChannelClose();
159 assert(base.logger.debug.called);
160 });
161 it('logs fatal event', function () {
162 base._eventAMQPChannelClose(fatalErr);
163 assert(base.logger.error.called);
164 });
165 }); // _eventAMQPChannelClose
166
167 describe('_eventAMQPChannelError', function () {
168 it('logs event', function () {
169 base._eventAMQPChannelError(fatalErr);
170 assert(base.logger.error.called);
171 });
172 }); // _eventAMQPChannelError
173
174 describe('_eventAMQPChannelReturn', function () {
175 it('logs event', function () {
176 const msg = 'msg';
177 base._eventAMQPChannelReturn(msg);
178 assert(base.logger.error.called);
179 });
180 }); // _eventAMQPChannelReturn
181
182 describe('_eventAMQPChannelDrain', function () {
183 it('logs event', function () {
184 base._eventAMQPChannelDrain();
185 assert(base.logger.debug.called);
186 });
187 }); // _eventAMQPChannelDrain
188 }); // AMQP Event Handlers
189
190 describe('AMQP Plumbing Naming', function () {
191 let name, prefix;
192 beforeEach(function () {
193 name = 'name';
194 prefix = base.options.prefix;
195 });
196 function _expectedParts(optionNames = []) {
197 const expected = [name];
198 optionNames.forEach((name) => {
199 if (base.options[name]) {
200 expected.push(...base.options[name].split('.'));
201 }
202 }); // eslint-disable-line security/detect-object-injection
203 return expected;
204 }
205 function _assertParts(value, expectedParts, splitter = '.') {
206 const parts = value.split(splitter);
207 expectedParts.forEach((part) => assert(parts.includes(part), `missing ${part} expected [${expectedParts}] got [${parts}]`));
208 }
209 function _assertNoPrefix(value, splitter = '.') {
210 const parts = value.split(splitter);
211 assert(!parts.includes(prefix));
212 }
213
214 describe('_exchangeName', function () {
215 it('names exchange', function () {
216 const expectedParts = _expectedParts(['prefix']);
217 const result = base._exchangeName(name);
218 _assertParts(result, expectedParts);
219 });
220 it('covers no prefix', function () {
221 const expectedParts = _expectedParts([]);
222 base.options.prefix = undefined;
223 const result = base._exchangeName(name);
224 _assertParts(result, expectedParts);
225 _assertNoPrefix(result);
226 });
227 }); // _exchangeName
228
229 describe('_retryExchangeName', function () {
230 it('names retry exchange', function () {
231 const expectedParts = _expectedParts(['prefix', 'retrySuffix']);
232 const result = base._retryExchangeName(name);
233 _assertParts(result, expectedParts);
234 });
235 it('covers no prefix', function () {
236 const expectedParts = _expectedParts(['retrySuffix']);
237 base.options.prefix = undefined;
238 const result = base._retryExchangeName(name);
239 _assertParts(result, expectedParts);
240 _assertNoPrefix(result);
241 });
242 }); // _retryExchangeName
243
244 describe('_queueName', function () {
245 it('names queue', function () {
246 const expectedParts = _expectedParts(['prefix', 'queueSuffix']);
247 const result = base._queueName(name);
248 _assertParts(result, expectedParts);
249 });
250 it('covers no prefix', function () {
251 const expectedParts = _expectedParts(['queueSuffix']);
252 base.options.prefix = undefined;
253 const result = base._queueName(name);
254 _assertParts(result, expectedParts);
255 _assertNoPrefix(result);
256 });
257 }); // _queueName
258
259 describe('_retryQueueName', function () {
260 it('names retry queue', function () {
261 const expectedParts = _expectedParts(['prefix', 'retrySuffix', 'queueSuffix']);
262 const result = base._retryQueueName(name);
263 _assertParts(result, expectedParts);
264 });
265 it('covers no prefix', function () {
266 const expectedParts = _expectedParts(['retrySuffix', 'queueSuffix']);
267 base.options.prefix = undefined;
268 const result = base._retryQueueName(name);
269 _assertParts(result, expectedParts);
270 _assertNoPrefix(result);
271 });
272 }); // _retryQueueName
273
274 describe('_retryQueuePolicyName', function () {
275 it('names retry wildcard queue for ttl policy', function () {
276 const expectedParts = _expectedParts(['prefix', 'retrySuffix', 'queueSuffix']);
277 expectedParts.shift();
278 expectedParts.push('.*');
279 const result = base._retryQueuePolicyName(name);
280 _assertParts(result, expectedParts, '\\.');
281 });
282 it('covers no prefix', function () {
283 const expectedParts = _expectedParts(['retrySuffix', 'queueSuffix']);
284 base.options.prefix = undefined;
285 expectedParts.shift();
286 expectedParts.push('.*');
287 const result = base._retryQueuePolicyName(name);
288 _assertParts(result, expectedParts, '\\.');
289 _assertNoPrefix(result, '\\.');
290 });
291 });
292 }); // AMQP Plumbing Naming
293
294 describe('establishAMQPPlumbing', function () {
295 beforeEach(async function () {
296 await base.connect();
297 sinon.stub(base, 'close');
298 });
299 it('covers success', async function () {
300 await base.establishAMQPPlumbing();
301 assert(base.channel.assertExchange.called);
302 assert(base.channel.assertQueue.called);
303 assert(base.channel.bindQueue.called);
304 });
305 it('covers failure', async function () {
306 base.channel.assertQueue.rejects(expectedException);
307 await assert.rejects(base.establishAMQPPlumbing(), expectedException);
308 assert(base.close.called);
309 });
310 }); // establishAMQPPlumbing
311
312 describe('close', function () {
313 let channelRecoverStub, channelCloseStub, connectionCloseStub;
314 beforeEach(async function () {
315 await base.connect();
316 channelRecoverStub = base.channel.recover;
317 channelCloseStub = base.channel.close;
318 connectionCloseStub = base.connection.close;
319 });
320 it('closes active connection and channel', async function () {
321 await base.close();
322 assert(channelRecoverStub.called);
323 assert(channelCloseStub.called);
324 assert(connectionCloseStub.called);
325 });
326 it('covers no active channel', async function () {
327 base.channel = undefined;
328 await base.close();
329 assert(channelRecoverStub.notCalled);
330 assert(channelCloseStub.notCalled);
331 assert(connectionCloseStub.called);
332 });
333 it('covers no active connection or channel', async function () {
334 base.channel = undefined;
335 base.connection = undefined;
336 await base.close();
337 assert(channelRecoverStub.notCalled);
338 assert(channelCloseStub.notCalled);
339 assert(connectionCloseStub.notCalled);
340 });
341 it('covers failure', async function () {
342 channelCloseStub.rejects(expectedException);
343 await assert.rejects(base.close(), expectedException);
344 });
345 }); // close
346
347 describe('health', function () {
348 beforeEach(async function () {
349 await base._establishAMQPConnection();
350 });
351 it('checks connection is writable', function () {
352 const result = base.health();
353 assert.strictEqual(result, true);
354 });
355 }); // health
356
357 describe('policyCommand', function () {
358 it('covers', function () {
359 const result = base.policyCommand();
360 assert(result);
361 });
362 }); // policyCommand
363
364 }); // Base