2 /* eslint-disable node/no-unpublished-require */
4 const assert
= require('assert');
5 const sinon
= require('sinon');
6 const Consumer
= require('../../lib/consumer');
7 const amqp
= require('amqplib');
8 const { StubLogger
} = require('@squeep/test-helper');
10 describe('Consumer', function () {
11 let logger
, options
, consumer
;
12 const expectedException
= new Error('oh no');
14 beforeEach(function () {
15 logger
= new StubLogger()
18 url: 'amqp://user:password@rabbitmq.int:5672',
20 sinon
.stub(amqp
, 'connect').resolves({
21 createConfirmChannel: sinon
.stub().resolves({
23 consume: sinon
.stub().resolves({ consumerTag: 'xyz' }),
24 assertExchange: sinon
.stub(),
25 assertQueue: sinon
.stub(),
26 bindQueue: sinon
.stub(),
27 checkQueue: sinon
.stub().resolves({}),
30 prefetch: sinon
.stub(),
35 consumer
= new Consumer(logger
, options
);
38 afterEach(function () {
42 describe('AMQP Event Handlers', function () {
43 beforeEach(function () {
44 consumer
.connection
= {};
45 consumer
.channel
= {};
46 consumer
.queueConsumerTags
= { 'consumer': 'placeholder' };
48 describe('_eventAMQPConnectionClose', function () {
49 it('clears consumer tags', function () {
50 consumer
._eventAMQPConnectionClose();
51 assert
.strictEqual(consumer
.connection
, undefined);
52 assert
.strictEqual(consumer
.channel
, undefined);
53 assert
.deepStrictEqual(consumer
.queueConsumerTags
, {});
54 assert(consumer
.logger
.error
.called
);
56 }); // _eventAMQPConnectionClose
57 describe('_eventAMQPChannelClose', function () {
58 it('clears consumer tags', function () {
59 consumer
._eventAMQPChannelClose();
60 assert
.deepStrictEqual(consumer
.queueConsumerTags
, {});
61 assert(consumer
.logger
.debug
.called
);
63 }); // _eventAMQPChannelClose
64 });// AMQP Event Handlers
66 describe('connect', function () {
67 beforeEach(function () {
68 sinon
.stub(consumer
, '_connectAMQP');
69 sinon
.stub(consumer
, 'close');
71 it('covers success', async
function () {
72 await consumer
.connect();
73 assert(consumer
._connectAMQP
.called
);
75 it('covers failure', async
function () {
76 consumer
._connectAMQP
.rejects(expectedException
);
77 await assert
.rejects(consumer
.connect(), expectedException
);
78 assert(consumer
._connectAMQP
.called
|| consumer
._connectRedlock
.called
);
79 assert(consumer
.close
.called
);
80 assert(consumer
.logger
.error
.called
);
84 describe('_messageConsumerFactory', function () {
85 let messageConsumer
, messageHandler
, message
;
86 beforeEach(async
function () {
87 await consumer
._connectAMQP();
88 messageHandler
= sinon
.stub();
89 messageConsumer
= consumer
._messageConsumerFactory(messageHandler
);
92 it('covers success', async
function () {
93 await
messageConsumer(message
);
94 assert(consumer
.channel
.nack
.notCalled
);
96 it('covers failure', async
function () {
97 messageHandler
.rejects(expectedException
);
98 await
messageConsumer(message
);
99 assert(consumer
.channel
.nack
.called
);
101 }); // _messageConsumerFactory
103 describe('consume', function () {
104 let messageHandler
, consumeOptions
, prefetch
;
105 beforeEach(async
function () {
106 messageHandler
= sinon
.stub();
109 await consumer
._connectAMQP();
111 it('covers success', async
function () {
112 await consumer
.consume(messageHandler
, consumeOptions
);
113 assert(consumer
.channel
.consume
.called
);
115 it('covers optional prefetch', async
function () {
116 await consumer
.consume(messageHandler
, consumeOptions
, prefetch
);
118 it('covers missing queue', async
function () {
119 consumer
.channel
.checkQueue
.resolves();
120 await assert
.rejects(consumer
.consume(messageHandler
, consumeOptions
));
121 assert(consumer
.channel
.consume
.notCalled
);
123 it('covers failure', async
function () {
124 consumer
.channel
.consume
.rejects(expectedException
);
125 await assert
.rejects(consumer
.consume(messageHandler
, options
), expectedException
);