2 /* eslint-disable node/no-unpublished-require */
4 const assert
= require('assert');
5 const sinon
= require('sinon');
6 const Base
= require('../../lib/base');
7 const amqp
= require('amqplib');
8 const { StubLogger
} = require('@squeep/test-helper');
10 describe('Base', function () {
11 let logger
, options
, base
;
12 const expectedException
= new Error('oh no');
14 beforeEach(function () {
15 logger
= new StubLogger()
19 url: 'amqp://user:password@rabbitmq.int:5672',
22 sinon
.stub(amqp
, 'connect').resolves({
23 createConfirmChannel: sinon
.stub().resolves({
25 checkQueue: sinon
.stub(),
26 assertExchange: sinon
.stub(),
27 assertQueue: sinon
.stub(),
28 bindQueue: sinon
.stub(),
29 prefetch: sinon
.stub(),
30 consume: sinon
.stub(),
31 recover: sinon
.stub(),
42 base
= new Base(logger
, options
);
45 afterEach(function () {
49 describe('connect', function () {
50 beforeEach(function () {
51 sinon
.stub(base
, '_connectAMQP');
52 sinon
.stub(base
, 'close');
54 it('covers success', async
function () {
56 assert(base
._connectAMQP
.called
);
58 it('covers failure', async
function () {
59 base
._connectAMQP
.rejects(expectedException
);
60 await assert
.rejects(base
.connect(), expectedException
);
61 assert(base
._connectAMQP
.called
);
62 assert(base
.close
.called
);
63 assert(base
.logger
.error
.called
);
67 describe('_connectAMQP', function () {
68 beforeEach(function () {
69 sinon
.stub(base
, '_establishAMQPConnection');
70 sinon
.stub(base
, '_establishAMQPChannel');
72 it('covers', async
function () {
73 await base
._connectAMQP();
74 assert(base
._establishAMQPConnection
.called
);
75 assert(base
._establishAMQPChannel
.called
);
79 describe('_establishAMQPConnection', function () {
80 it('covers success', async
function () {
81 await base
._establishAMQPConnection();
82 assert(amqp
.connect
.called
);
83 assert(base
.connection
.on
.called
);
85 it('covers failure', async
function () {
86 amqp
.connect
.rejects(expectedException
);
87 await assert
.rejects(base
._establishAMQPConnection(), expectedException
);
89 }); // _establishAMQPConnection
91 describe('_establishAMQPChannel', function () {
92 beforeEach(async
function () {
93 await base
._establishAMQPConnection();
95 it('covers success', async
function () {
96 await base
._establishAMQPChannel();
97 assert(base
.connection
.createConfirmChannel
.called
);
98 assert(base
.channel
.on
.called
);
100 it('covers failure', async
function () {
101 base
.connection
.createConfirmChannel
.rejects(expectedException
);
102 await assert
.rejects(base
._establishAMQPChannel(), expectedException
);
104 }); // _establishAMQPChannel
106 describe('AMQP Event Handlers', function () {
107 let fatalErr
, nonFatalErr
;
108 beforeEach(function () {
115 base
.connection
= {};
117 base
.queueConsumerTags
= { 'consumer': 'placeholder' };
119 describe('_eventAMQPConnectionClose', function () {
120 it('fatal behavior', function () {
121 base
._eventAMQPConnectionClose(fatalErr
);
122 assert
.strictEqual(base
.connection
, undefined);
123 assert
.strictEqual(base
.channel
, undefined);
124 assert(base
.logger
.error
.called
);
126 it('non-fatal behavior', function () {
127 base
._eventAMQPConnectionClose(nonFatalErr
);
128 assert
.strictEqual(base
.connection
, undefined);
129 assert
.strictEqual(base
.channel
, undefined);
130 assert(base
.logger
.debug
.called
);
132 }); // _eventAMQPConnectionClose
134 describe('_eventAMQPConnectionError', function () {
135 it('logs event', function () {
136 base
._eventAMQPConnectionError(fatalErr
);
137 assert(base
.logger
.error
.called
);
139 }); // _eventAMQPConnectionError
141 describe('_eventAMQPConnectionBlocked', function () {
142 it('logs event', function () {
143 const reason
= 'because';
144 base
._eventAMQPConnectionBlocked(reason
);
145 assert(base
.logger
.debug
.called
);
147 }); // _eventAMQPConnectionBlocked
149 describe('_eventAMQPConnectionUnblocked', function () {
150 it('logs event', function () {
151 base
._eventAMQPConnectionUnblocked();
152 assert(base
.logger
.debug
.called
);
154 }); // _eventAMQPConnectionUnblocked
156 describe('_eventAMQPChannelClose', function () {
157 it('logs event', function () {
158 base
._eventAMQPChannelClose();
159 assert(base
.logger
.debug
.called
);
161 it('logs fatal event', function () {
162 base
._eventAMQPChannelClose(fatalErr
);
163 assert(base
.logger
.error
.called
);
165 }); // _eventAMQPChannelClose
167 describe('_eventAMQPChannelError', function () {
168 it('logs event', function () {
169 base
._eventAMQPChannelError(fatalErr
);
170 assert(base
.logger
.error
.called
);
172 }); // _eventAMQPChannelError
174 describe('_eventAMQPChannelReturn', function () {
175 it('logs event', function () {
177 base
._eventAMQPChannelReturn(msg
);
178 assert(base
.logger
.error
.called
);
180 }); // _eventAMQPChannelReturn
182 describe('_eventAMQPChannelDrain', function () {
183 it('logs event', function () {
184 base
._eventAMQPChannelDrain();
185 assert(base
.logger
.debug
.called
);
187 }); // _eventAMQPChannelDrain
188 }); // AMQP Event Handlers
190 describe('AMQP Plumbing Naming', function () {
192 beforeEach(function () {
194 prefix
= base
.options
.prefix
;
196 function _expectedParts(optionNames
= []) {
197 const expected
= [name
];
198 optionNames
.forEach((name
) => {
199 if (base
.options
[name
]) {
200 expected
.push(...base
.options
[name
].split('.'));
202 }); // eslint-disable-line security/detect-object-injection
205 function _assertParts(value
, expectedParts
, splitter
= '.') {
206 const parts
= value
.split(splitter
);
207 expectedParts
.forEach((part
) => assert(parts
.includes(part
), `missing ${part} expected [${expectedParts}] got [${parts}]`));
209 function _assertNoPrefix(value
, splitter
= '.') {
210 const parts
= value
.split(splitter
);
211 assert(!parts
.includes(prefix
));
214 describe('_exchangeName', function () {
215 it('names exchange', function () {
216 const expectedParts
= _expectedParts(['prefix']);
217 const result
= base
._exchangeName(name
);
218 _assertParts(result
, expectedParts
);
220 it('covers no prefix', function () {
221 const expectedParts
= _expectedParts([]);
222 base
.options
.prefix
= undefined;
223 const result
= base
._exchangeName(name
);
224 _assertParts(result
, expectedParts
);
225 _assertNoPrefix(result
);
229 describe('_retryExchangeName', function () {
230 it('names retry exchange', function () {
231 const expectedParts
= _expectedParts(['prefix', 'retrySuffix']);
232 const result
= base
._retryExchangeName(name
);
233 _assertParts(result
, expectedParts
);
235 it('covers no prefix', function () {
236 const expectedParts
= _expectedParts(['retrySuffix']);
237 base
.options
.prefix
= undefined;
238 const result
= base
._retryExchangeName(name
);
239 _assertParts(result
, expectedParts
);
240 _assertNoPrefix(result
);
242 }); // _retryExchangeName
244 describe('_queueName', function () {
245 it('names queue', function () {
246 const expectedParts
= _expectedParts(['prefix', 'queueSuffix']);
247 const result
= base
._queueName(name
);
248 _assertParts(result
, expectedParts
);
250 it('covers no prefix', function () {
251 const expectedParts
= _expectedParts(['queueSuffix']);
252 base
.options
.prefix
= undefined;
253 const result
= base
._queueName(name
);
254 _assertParts(result
, expectedParts
);
255 _assertNoPrefix(result
);
259 describe('_retryQueueName', function () {
260 it('names retry queue', function () {
261 const expectedParts
= _expectedParts(['prefix', 'retrySuffix', 'queueSuffix']);
262 const result
= base
._retryQueueName(name
);
263 _assertParts(result
, expectedParts
);
265 it('covers no prefix', function () {
266 const expectedParts
= _expectedParts(['retrySuffix', 'queueSuffix']);
267 base
.options
.prefix
= undefined;
268 const result
= base
._retryQueueName(name
);
269 _assertParts(result
, expectedParts
);
270 _assertNoPrefix(result
);
272 }); // _retryQueueName
274 describe('_retryQueuePolicyName', function () {
275 it('names retry wildcard queue for ttl policy', function () {
276 const expectedParts
= _expectedParts(['prefix', 'retrySuffix', 'queueSuffix']);
277 expectedParts
.shift();
278 expectedParts
.push('.*');
279 const result
= base
._retryQueuePolicyName(name
);
280 _assertParts(result
, expectedParts
, '\\.');
282 it('covers no prefix', function () {
283 const expectedParts
= _expectedParts(['retrySuffix', 'queueSuffix']);
284 base
.options
.prefix
= undefined;
285 expectedParts
.shift();
286 expectedParts
.push('.*');
287 const result
= base
._retryQueuePolicyName(name
);
288 _assertParts(result
, expectedParts
, '\\.');
289 _assertNoPrefix(result
, '\\.');
292 }); // AMQP Plumbing Naming
294 describe('establishAMQPPlumbing', function () {
295 beforeEach(async
function () {
296 await base
.connect();
297 sinon
.stub(base
, 'close');
299 it('covers success', async
function () {
300 await base
.establishAMQPPlumbing();
301 assert(base
.channel
.assertExchange
.called
);
302 assert(base
.channel
.assertQueue
.called
);
303 assert(base
.channel
.bindQueue
.called
);
305 it('covers failure', async
function () {
306 base
.channel
.assertQueue
.rejects(expectedException
);
307 await assert
.rejects(base
.establishAMQPPlumbing(), expectedException
);
308 assert(base
.close
.called
);
310 }); // establishAMQPPlumbing
312 describe('close', function () {
313 let channelRecoverStub
, channelCloseStub
, connectionCloseStub
;
314 beforeEach(async
function () {
315 await base
.connect();
316 channelRecoverStub
= base
.channel
.recover
;
317 channelCloseStub
= base
.channel
.close
;
318 connectionCloseStub
= base
.connection
.close
;
320 it('closes active connection and channel', async
function () {
322 assert(channelRecoverStub
.called
);
323 assert(channelCloseStub
.called
);
324 assert(connectionCloseStub
.called
);
326 it('covers no active channel', async
function () {
327 base
.channel
= undefined;
329 assert(channelRecoverStub
.notCalled
);
330 assert(channelCloseStub
.notCalled
);
331 assert(connectionCloseStub
.called
);
333 it('covers no active connection or channel', async
function () {
334 base
.channel
= undefined;
335 base
.connection
= undefined;
337 assert(channelRecoverStub
.notCalled
);
338 assert(channelCloseStub
.notCalled
);
339 assert(connectionCloseStub
.notCalled
);
341 it('covers failure', async
function () {
342 channelCloseStub
.rejects(expectedException
);
343 await assert
.rejects(base
.close(), expectedException
);
347 describe('health', function () {
348 beforeEach(async
function () {
349 await base
._establishAMQPConnection();
351 it('checks connection is writable', function () {
352 const result
= base
.health();
353 assert
.strictEqual(result
, true);
357 describe('policyCommand', function () {
358 it('covers', function () {
359 const result
= base
.policyCommand();