--- /dev/null
+/* eslint-env mocha */
+/* eslint-disable node/no-unpublished-require */
+'use strict';
+
+const assert = require('assert');
+const sinon = require('sinon');
+const amqp = require('amqplib');
+const Publisher = require('../../lib/publisher');
+const { StubLogger } = require('@squeep/test-helper');
+
+describe('Publisher', function () {
+ let publisher, logger, options, name;
+ const expectedException = new Error('oh no');
+
+ beforeEach(function () {
+ logger = new StubLogger()
+ logger._reset();
+ options = {
+ amqp: {
+ url: 'amqp://user:password@rabbitmq.int:5672',
+ },
+ };
+ sinon.stub(amqp, 'connect').resolves({
+ createConfirmChannel: sinon.stub().resolves({
+ on: sinon.stub(),
+ checkQueue: sinon.stub(),
+ assertExchange: sinon.stub(),
+ assertQueue: sinon.stub(),
+ bindQueue: sinon.stub(),
+ prefetch: sinon.stub(),
+ consume: sinon.stub(),
+ recover: sinon.stub(),
+ close: sinon.stub(),
+ }),
+ connection: {
+ stream: {
+ writable: true,
+ },
+ },
+ on: sinon.stub(),
+ close: sinon.stub(),
+ });
+ publisher = new Publisher(logger, options);
+ publisher.channel = {
+ publish: sinon.stub().returns(true),
+ recover: sinon.stub(),
+ close: sinon.stub(),
+ };
+ name = 'name';
+ });
+
+ function publishSuccess(_exchange, _routing, _content, _options, cb) {
+ cb(null, {});
+ return true;
+ }
+
+ function publishFailure(_exchange, _routing, _content, _options, cb) {
+ cb(expectedException, null);
+ return true;
+ }
+
+ function publishSuccessFull(_exchange, _routing, _content, _options, cb) {
+ cb(null, {});
+ return false;
+ }
+
+ function publishFailureFull(_exchange, _routing, _content, _options, cb) {
+ cb(expectedException, null);
+ return false;
+ }
+
+ afterEach(function () {
+ sinon.restore();
+ });
+
+ describe('publish', function () {
+ let content;
+ beforeEach(function () {
+ content = Buffer.from('test');
+ });
+ it('covers success', async function () {
+ publisher.channel.publish = sinon.spy(publishSuccess);
+ await publisher.publish(name, content);
+ assert.strictEqual(publisher.keepSending, true);
+ });
+ it('covers failure', async function () {
+ publisher.channel.publish = sinon.spy(publishFailure);
+ await assert.rejects(publisher.publish(name, content), expectedException);
+ assert.strictEqual(publisher.keepSending, true);
+ });
+ it('covers success, stream full', async function () {
+ publisher.channel.publish = sinon.spy(publishSuccessFull);
+ await publisher.publish(name, content);
+ assert.strictEqual(publisher.keepSending, false);
+ });
+ it('covers failure, stream full', async function () {
+ publisher.channel.publish = sinon.spy(publishFailureFull);
+ await assert.rejects(publisher.publish(name, content), expectedException);
+ assert.strictEqual(publisher.keepSending, false);
+ });
+ it('covers non-buffer content', async function () {
+ content = 'text';
+ publisher.channel.publish = sinon.spy(publishSuccess);
+ await publisher.publish(name, content);
+ assert.strictEqual(publisher.keepSending, true);
+ });
+ it('covers object content', async function () {
+ content = { foo: 'bar' };
+ publisher.channel.publish = sinon.spy(publishSuccess);
+ await publisher.publish(name, content);
+ assert.strictEqual(publisher.keepSending, true);
+ });
+ it('queues content when stream full', async function () {
+ publisher.keepSending = false;
+ publisher.channel.publish = sinon.spy(publishSuccess);
+ await publisher.publish(name, content);
+ assert.strictEqual(publisher.keepSending, false);
+ assert.strictEqual(publisher.drainQueue.length, 1);
+ });
+ }); // publish
+
+ describe('_eventAMQPChannelDrain', function () {
+ beforeEach(function () {
+ sinon.stub(publisher, '_publishDrainQueue');
+ });
+ it('resets keepSending and tries to drain the queue', function () {
+ publisher._eventAMQPChannelDrain();
+ assert.strictEqual(publisher.keepSending, true);
+ assert(publisher._publishDrainQueue.called);
+ });
+ }); // _eventAMQPChannelDrain
+
+ describe('_publishDrainQueue', function () {
+ it('sends queued contents', function () {
+ publisher.drainQueue.push({ content: Buffer.from('xyz'), options: {} });
+ publisher.channel.publish = sinon.spy(publishSuccess);
+ publisher._publishDrainQueue();
+ assert(publisher.channel.publish.called);
+ assert.strictEqual(publisher.drainQueue.length, 0);
+ });
+ it('covers publish failure', function () {
+ publisher.drainQueue.push({ content: Buffer.from('xyz'), options: {} });
+ publisher.channel.publish = sinon.spy(publishFailure);
+ publisher._publishDrainQueue();
+ assert.strictEqual(publisher.drainQueue.length, 0);
+ });
+ }); // _publishDrainQueue
+
+ describe('establishAMQPPlumbing', function () {
+ beforeEach(async function () {
+ await publisher.connect();
+ sinon.stub(publisher, 'close');
+ });
+ it('covers success', async function () {
+ await publisher.establishAMQPPlumbing();
+ assert(publisher.channel.assertExchange.called);
+ });
+ it('covers failure', async function () {
+ publisher.channel.assertExchange.rejects(expectedException);
+ await assert.rejects(publisher.establishAMQPPlumbing(), expectedException);
+ assert(publisher.close.called);
+ });
+ }); // establishAMQPPlumbing
+
+}); // Publisher
\ No newline at end of file