initial commit
[squeep-amqp-helper] / test / lib / consumer.js
1 /* eslint-env mocha */
2 /* eslint-disable node/no-unpublished-require */
3 'use strict';
4 const assert = require('assert');
5 const sinon = require('sinon');
6 const Consumer = require('../../lib/consumer');
7 const amqp = require('amqplib');
8 const { StubLogger } = require('@squeep/test-helper');
9
10 describe('Consumer', function () {
11 let logger, options, consumer;
12 const expectedException = new Error('oh no');
13
14 beforeEach(function () {
15 logger = new StubLogger()
16 logger._reset();
17 options = {
18 url: 'amqp://user:password@rabbitmq.int:5672',
19 };
20 sinon.stub(amqp, 'connect').resolves({
21 createConfirmChannel: sinon.stub().resolves({
22 ack: sinon.stub(),
23 consume: sinon.stub().resolves({ consumerTag: 'xyz' }),
24 assertExchange: sinon.stub(),
25 assertQueue: sinon.stub(),
26 bindQueue: sinon.stub(),
27 checkQueue: sinon.stub().resolves({}),
28 nack: sinon.stub(),
29 on: sinon.stub(),
30 prefetch: sinon.stub(),
31 }),
32 on: sinon.stub(),
33 close: sinon.stub(),
34 });
35 consumer = new Consumer(logger, options);
36 });
37
38 afterEach(function () {
39 sinon.restore();
40 });
41
42 describe('AMQP Event Handlers', function () {
43 beforeEach(function () {
44 consumer.connection = {};
45 consumer.channel = {};
46 consumer.queueConsumerTags = { 'consumer': 'placeholder' };
47 });
48 describe('_eventAMQPConnectionClose', function () {
49 it('clears consumer tags', function () {
50 consumer._eventAMQPConnectionClose();
51 assert.strictEqual(consumer.connection, undefined);
52 assert.strictEqual(consumer.channel, undefined);
53 assert.deepStrictEqual(consumer.queueConsumerTags, {});
54 assert(consumer.logger.error.called);
55 });
56 }); // _eventAMQPConnectionClose
57 describe('_eventAMQPChannelClose', function () {
58 it('clears consumer tags', function () {
59 consumer._eventAMQPChannelClose();
60 assert.deepStrictEqual(consumer.queueConsumerTags, {});
61 assert(consumer.logger.debug.called);
62 });
63 }); // _eventAMQPChannelClose
64 });// AMQP Event Handlers
65
66 describe('connect', function () {
67 beforeEach(function () {
68 sinon.stub(consumer, '_connectAMQP');
69 sinon.stub(consumer, 'close');
70 });
71 it('covers success', async function () {
72 await consumer.connect();
73 assert(consumer._connectAMQP.called);
74 });
75 it('covers failure', async function () {
76 consumer._connectAMQP.rejects(expectedException);
77 await assert.rejects(consumer.connect(), expectedException);
78 assert(consumer._connectAMQP.called || consumer._connectRedlock.called);
79 assert(consumer.close.called);
80 assert(consumer.logger.error.called);
81 });
82 }); // connect
83
84 describe('_messageConsumerFactory', function () {
85 let messageConsumer, messageHandler, message;
86 beforeEach(async function () {
87 await consumer._connectAMQP();
88 messageHandler = sinon.stub();
89 messageConsumer = consumer._messageConsumerFactory(messageHandler);
90 message = 'foop';
91 });
92 it('covers success', async function () {
93 await messageConsumer(message);
94 assert(consumer.channel.nack.notCalled);
95 });
96 it('covers failure', async function () {
97 messageHandler.rejects(expectedException);
98 await messageConsumer(message);
99 assert(consumer.channel.nack.called);
100 });
101 }); // _messageConsumerFactory
102
103 describe('consume', function () {
104 let messageHandler, consumeOptions, prefetch;
105 beforeEach(async function () {
106 messageHandler = sinon.stub();
107 consumeOptions = {};
108 prefetch = 1;
109 await consumer._connectAMQP();
110 });
111 it('covers success', async function () {
112 await consumer.consume(messageHandler, consumeOptions);
113 assert(consumer.channel.consume.called);
114 });
115 it('covers optional prefetch', async function () {
116 await consumer.consume(messageHandler, consumeOptions, prefetch);
117 });
118 it('covers missing queue', async function () {
119 consumer.channel.checkQueue.resolves();
120 await assert.rejects(consumer.consume(messageHandler, consumeOptions));
121 assert(consumer.channel.consume.notCalled);
122 });
123 it('covers failure', async function () {
124 consumer.channel.consume.rejects(expectedException);
125 await assert.rejects(consumer.consume(messageHandler, options), expectedException);
126 });
127 }); // consume
128
129 }); // Consumer