X-Git-Url: http://git.squeep.com/?p=squeep-amqp-helper;a=blobdiff_plain;f=test%2Flib%2Fpublisher.js;fp=test%2Flib%2Fpublisher.js;h=9d6d1972585e568c3d2474e24c0df12f5732cbb8;hp=0000000000000000000000000000000000000000;hb=174280d3f44ba13dac0b26d42d968189a4f4fa93;hpb=67905316ada5ee4668306506705f4ee2a5f407f0 diff --git a/test/lib/publisher.js b/test/lib/publisher.js new file mode 100644 index 0000000..9d6d197 --- /dev/null +++ b/test/lib/publisher.js @@ -0,0 +1,165 @@ +/* eslint-env mocha */ +/* eslint-disable node/no-unpublished-require */ +'use strict'; + +const assert = require('assert'); +const sinon = require('sinon'); +const amqp = require('amqplib'); +const Publisher = require('../../lib/publisher'); +const { StubLogger } = require('@squeep/test-helper'); + +describe('Publisher', function () { + let publisher, logger, options, name; + const expectedException = new Error('oh no'); + + beforeEach(function () { + logger = new StubLogger() + logger._reset(); + options = { + amqp: { + url: 'amqp://user:password@rabbitmq.int:5672', + }, + }; + sinon.stub(amqp, 'connect').resolves({ + createConfirmChannel: sinon.stub().resolves({ + on: sinon.stub(), + checkQueue: sinon.stub(), + assertExchange: sinon.stub(), + assertQueue: sinon.stub(), + bindQueue: sinon.stub(), + prefetch: sinon.stub(), + consume: sinon.stub(), + recover: sinon.stub(), + close: sinon.stub(), + }), + connection: { + stream: { + writable: true, + }, + }, + on: sinon.stub(), + close: sinon.stub(), + }); + publisher = new Publisher(logger, options); + publisher.channel = { + publish: sinon.stub().returns(true), + recover: sinon.stub(), + close: sinon.stub(), + }; + name = 'name'; + }); + + function publishSuccess(_exchange, _routing, _content, _options, cb) { + cb(null, {}); + return true; + } + + function publishFailure(_exchange, _routing, _content, _options, cb) { + cb(expectedException, null); + return true; + } + + function publishSuccessFull(_exchange, _routing, _content, _options, cb) { + cb(null, {}); + return false; + } + + function publishFailureFull(_exchange, _routing, _content, _options, cb) { + cb(expectedException, null); + return false; + } + + afterEach(function () { + sinon.restore(); + }); + + describe('publish', function () { + let content; + beforeEach(function () { + content = Buffer.from('test'); + }); + it('covers success', async function () { + publisher.channel.publish = sinon.spy(publishSuccess); + await publisher.publish(name, content); + assert.strictEqual(publisher.keepSending, true); + }); + it('covers failure', async function () { + publisher.channel.publish = sinon.spy(publishFailure); + await assert.rejects(publisher.publish(name, content), expectedException); + assert.strictEqual(publisher.keepSending, true); + }); + it('covers success, stream full', async function () { + publisher.channel.publish = sinon.spy(publishSuccessFull); + await publisher.publish(name, content); + assert.strictEqual(publisher.keepSending, false); + }); + it('covers failure, stream full', async function () { + publisher.channel.publish = sinon.spy(publishFailureFull); + await assert.rejects(publisher.publish(name, content), expectedException); + assert.strictEqual(publisher.keepSending, false); + }); + it('covers non-buffer content', async function () { + content = 'text'; + publisher.channel.publish = sinon.spy(publishSuccess); + await publisher.publish(name, content); + assert.strictEqual(publisher.keepSending, true); + }); + it('covers object content', async function () { + content = { foo: 'bar' }; + publisher.channel.publish = sinon.spy(publishSuccess); + await publisher.publish(name, content); + assert.strictEqual(publisher.keepSending, true); + }); + it('queues content when stream full', async function () { + publisher.keepSending = false; + publisher.channel.publish = sinon.spy(publishSuccess); + await publisher.publish(name, content); + assert.strictEqual(publisher.keepSending, false); + assert.strictEqual(publisher.drainQueue.length, 1); + }); + }); // publish + + describe('_eventAMQPChannelDrain', function () { + beforeEach(function () { + sinon.stub(publisher, '_publishDrainQueue'); + }); + it('resets keepSending and tries to drain the queue', function () { + publisher._eventAMQPChannelDrain(); + assert.strictEqual(publisher.keepSending, true); + assert(publisher._publishDrainQueue.called); + }); + }); // _eventAMQPChannelDrain + + describe('_publishDrainQueue', function () { + it('sends queued contents', function () { + publisher.drainQueue.push({ content: Buffer.from('xyz'), options: {} }); + publisher.channel.publish = sinon.spy(publishSuccess); + publisher._publishDrainQueue(); + assert(publisher.channel.publish.called); + assert.strictEqual(publisher.drainQueue.length, 0); + }); + it('covers publish failure', function () { + publisher.drainQueue.push({ content: Buffer.from('xyz'), options: {} }); + publisher.channel.publish = sinon.spy(publishFailure); + publisher._publishDrainQueue(); + assert.strictEqual(publisher.drainQueue.length, 0); + }); + }); // _publishDrainQueue + + describe('establishAMQPPlumbing', function () { + beforeEach(async function () { + await publisher.connect(); + sinon.stub(publisher, 'close'); + }); + it('covers success', async function () { + await publisher.establishAMQPPlumbing(); + assert(publisher.channel.assertExchange.called); + }); + it('covers failure', async function () { + publisher.channel.assertExchange.rejects(expectedException); + await assert.rejects(publisher.establishAMQPPlumbing(), expectedException); + assert(publisher.close.called); + }); + }); // establishAMQPPlumbing + +}); // Publisher \ No newline at end of file