X-Git-Url: http://git.squeep.com/?p=squeep-amqp-helper;a=blobdiff_plain;f=test%2Flib%2Fconsumer.js;fp=test%2Flib%2Fconsumer.js;h=79132f57ce215e3f90de8c3072d0313c2f982b17;hp=0000000000000000000000000000000000000000;hb=174280d3f44ba13dac0b26d42d968189a4f4fa93;hpb=67905316ada5ee4668306506705f4ee2a5f407f0 diff --git a/test/lib/consumer.js b/test/lib/consumer.js new file mode 100644 index 0000000..79132f5 --- /dev/null +++ b/test/lib/consumer.js @@ -0,0 +1,129 @@ +/* eslint-env mocha */ +/* eslint-disable node/no-unpublished-require */ +'use strict'; +const assert = require('assert'); +const sinon = require('sinon'); +const Consumer = require('../../lib/consumer'); +const amqp = require('amqplib'); +const { StubLogger } = require('@squeep/test-helper'); + +describe('Consumer', function () { + let logger, options, consumer; + const expectedException = new Error('oh no'); + + beforeEach(function () { + logger = new StubLogger() + logger._reset(); + options = { + url: 'amqp://user:password@rabbitmq.int:5672', + }; + sinon.stub(amqp, 'connect').resolves({ + createConfirmChannel: sinon.stub().resolves({ + ack: sinon.stub(), + consume: sinon.stub().resolves({ consumerTag: 'xyz' }), + assertExchange: sinon.stub(), + assertQueue: sinon.stub(), + bindQueue: sinon.stub(), + checkQueue: sinon.stub().resolves({}), + nack: sinon.stub(), + on: sinon.stub(), + prefetch: sinon.stub(), + }), + on: sinon.stub(), + close: sinon.stub(), + }); + consumer = new Consumer(logger, options); + }); + + afterEach(function () { + sinon.restore(); + }); + + describe('AMQP Event Handlers', function () { + beforeEach(function () { + consumer.connection = {}; + consumer.channel = {}; + consumer.queueConsumerTags = { 'consumer': 'placeholder' }; + }); + describe('_eventAMQPConnectionClose', function () { + it('clears consumer tags', function () { + consumer._eventAMQPConnectionClose(); + assert.strictEqual(consumer.connection, undefined); + assert.strictEqual(consumer.channel, undefined); + assert.deepStrictEqual(consumer.queueConsumerTags, {}); + assert(consumer.logger.error.called); + }); + }); // _eventAMQPConnectionClose + describe('_eventAMQPChannelClose', function () { + it('clears consumer tags', function () { + consumer._eventAMQPChannelClose(); + assert.deepStrictEqual(consumer.queueConsumerTags, {}); + assert(consumer.logger.debug.called); + }); + }); // _eventAMQPChannelClose + });// AMQP Event Handlers + + describe('connect', function () { + beforeEach(function () { + sinon.stub(consumer, '_connectAMQP'); + sinon.stub(consumer, 'close'); + }); + it('covers success', async function () { + await consumer.connect(); + assert(consumer._connectAMQP.called); + }); + it('covers failure', async function () { + consumer._connectAMQP.rejects(expectedException); + await assert.rejects(consumer.connect(), expectedException); + assert(consumer._connectAMQP.called || consumer._connectRedlock.called); + assert(consumer.close.called); + assert(consumer.logger.error.called); + }); + }); // connect + + describe('_messageConsumerFactory', function () { + let messageConsumer, messageHandler, message; + beforeEach(async function () { + await consumer._connectAMQP(); + messageHandler = sinon.stub(); + messageConsumer = consumer._messageConsumerFactory(messageHandler); + message = 'foop'; + }); + it('covers success', async function () { + await messageConsumer(message); + assert(consumer.channel.nack.notCalled); + }); + it('covers failure', async function () { + messageHandler.rejects(expectedException); + await messageConsumer(message); + assert(consumer.channel.nack.called); + }); + }); // _messageConsumerFactory + + describe('consume', function () { + let messageHandler, consumeOptions, prefetch; + beforeEach(async function () { + messageHandler = sinon.stub(); + consumeOptions = {}; + prefetch = 1; + await consumer._connectAMQP(); + }); + it('covers success', async function () { + await consumer.consume(messageHandler, consumeOptions); + assert(consumer.channel.consume.called); + }); + it('covers optional prefetch', async function () { + await consumer.consume(messageHandler, consumeOptions, prefetch); + }); + it('covers missing queue', async function () { + consumer.channel.checkQueue.resolves(); + await assert.rejects(consumer.consume(messageHandler, consumeOptions)); + assert(consumer.channel.consume.notCalled); + }); + it('covers failure', async function () { + consumer.channel.consume.rejects(expectedException); + await assert.rejects(consumer.consume(messageHandler, options), expectedException); + }); + }); // consume + +}); // Consumer \ No newline at end of file