initial commit
[squeep-amqp-helper] / test / lib / publisher.js
diff --git a/test/lib/publisher.js b/test/lib/publisher.js
new file mode 100644 (file)
index 0000000..9d6d197
--- /dev/null
@@ -0,0 +1,165 @@
+/* eslint-env mocha */
+/* eslint-disable node/no-unpublished-require */
+'use strict';
+
+const assert = require('assert');
+const sinon = require('sinon');
+const amqp = require('amqplib');
+const Publisher = require('../../lib/publisher');
+const { StubLogger } = require('@squeep/test-helper');
+
+describe('Publisher', function () {
+  let publisher, logger, options, name;
+  const expectedException = new Error('oh no');
+
+  beforeEach(function () {
+    logger = new StubLogger()
+    logger._reset();
+    options = {
+      amqp: {
+        url: 'amqp://user:password@rabbitmq.int:5672',
+      },
+    };
+    sinon.stub(amqp, 'connect').resolves({
+      createConfirmChannel: sinon.stub().resolves({
+        on: sinon.stub(),
+        checkQueue: sinon.stub(),
+        assertExchange: sinon.stub(),
+        assertQueue: sinon.stub(),
+        bindQueue: sinon.stub(),
+        prefetch: sinon.stub(),
+        consume: sinon.stub(),
+        recover: sinon.stub(),
+        close: sinon.stub(),
+      }),
+      connection: {
+        stream: {
+          writable: true,
+        },
+      },
+      on: sinon.stub(),
+      close: sinon.stub(),
+    });
+    publisher = new Publisher(logger, options);
+    publisher.channel = {
+      publish: sinon.stub().returns(true),
+      recover: sinon.stub(),
+      close: sinon.stub(),
+    };
+    name = 'name';
+  });
+
+  function publishSuccess(_exchange, _routing, _content, _options, cb) {
+    cb(null, {});
+    return true;
+  }
+
+  function publishFailure(_exchange, _routing, _content, _options, cb) {
+    cb(expectedException, null);
+    return true;
+  }
+
+  function publishSuccessFull(_exchange, _routing, _content, _options, cb) {
+    cb(null, {});
+    return false;
+  }
+
+  function publishFailureFull(_exchange, _routing, _content, _options, cb) {
+    cb(expectedException, null);
+    return false;
+  }
+
+  afterEach(function () {
+    sinon.restore();
+  });
+
+  describe('publish', function () {
+    let content;
+    beforeEach(function () {
+      content = Buffer.from('test');
+    });
+    it('covers success', async function () {
+      publisher.channel.publish = sinon.spy(publishSuccess);
+      await publisher.publish(name, content);
+      assert.strictEqual(publisher.keepSending, true);
+    });
+    it('covers failure', async function () {
+      publisher.channel.publish = sinon.spy(publishFailure);
+      await assert.rejects(publisher.publish(name, content), expectedException);
+      assert.strictEqual(publisher.keepSending, true);
+    });
+    it('covers success, stream full', async function () {
+      publisher.channel.publish = sinon.spy(publishSuccessFull);
+      await publisher.publish(name, content);
+      assert.strictEqual(publisher.keepSending, false);
+    });
+    it('covers failure, stream full', async function () {
+      publisher.channel.publish = sinon.spy(publishFailureFull);
+      await assert.rejects(publisher.publish(name, content), expectedException);
+      assert.strictEqual(publisher.keepSending, false);
+    });
+    it('covers non-buffer content', async function () {
+      content = 'text';
+      publisher.channel.publish = sinon.spy(publishSuccess);
+      await publisher.publish(name, content);
+      assert.strictEqual(publisher.keepSending, true);
+    });
+    it('covers object content', async function () {
+      content = { foo: 'bar' };
+      publisher.channel.publish = sinon.spy(publishSuccess);
+      await publisher.publish(name, content);
+      assert.strictEqual(publisher.keepSending, true);
+    });
+    it('queues content when stream full', async function () {
+      publisher.keepSending = false;
+      publisher.channel.publish = sinon.spy(publishSuccess);
+      await publisher.publish(name, content);
+      assert.strictEqual(publisher.keepSending, false);
+      assert.strictEqual(publisher.drainQueue.length, 1);
+    });
+  }); // publish
+
+  describe('_eventAMQPChannelDrain', function () {
+    beforeEach(function () {
+      sinon.stub(publisher, '_publishDrainQueue');
+    });
+    it('resets keepSending and tries to drain the queue', function () {
+      publisher._eventAMQPChannelDrain();
+      assert.strictEqual(publisher.keepSending, true);
+      assert(publisher._publishDrainQueue.called);
+    });
+  }); // _eventAMQPChannelDrain
+
+  describe('_publishDrainQueue', function () {
+    it('sends queued contents', function () {
+      publisher.drainQueue.push({ content: Buffer.from('xyz'), options: {} });
+      publisher.channel.publish = sinon.spy(publishSuccess);
+      publisher._publishDrainQueue();
+      assert(publisher.channel.publish.called);
+      assert.strictEqual(publisher.drainQueue.length, 0);
+    });
+    it('covers publish failure', function () {
+      publisher.drainQueue.push({ content: Buffer.from('xyz'), options: {} });
+      publisher.channel.publish = sinon.spy(publishFailure);
+      publisher._publishDrainQueue();
+      assert.strictEqual(publisher.drainQueue.length, 0);
+    });
+  }); // _publishDrainQueue
+
+  describe('establishAMQPPlumbing', function () {
+    beforeEach(async function () {
+      await publisher.connect();
+      sinon.stub(publisher, 'close');
+    });
+    it('covers success', async function () {
+      await publisher.establishAMQPPlumbing();
+      assert(publisher.channel.assertExchange.called);
+    });
+    it('covers failure', async function () {
+      publisher.channel.assertExchange.rejects(expectedException);
+      await assert.rejects(publisher.establishAMQPPlumbing(), expectedException);
+      assert(publisher.close.called);
+    });
+  }); // establishAMQPPlumbing
+
+}); // Publisher
\ No newline at end of file