initial commit
[squeep-amqp-helper] / test / lib / publisher.js
1 /* eslint-env mocha */
2 /* eslint-disable node/no-unpublished-require */
3 'use strict';
4
5 const assert = require('assert');
6 const sinon = require('sinon');
7 const amqp = require('amqplib');
8 const Publisher = require('../../lib/publisher');
9 const { StubLogger } = require('@squeep/test-helper');
10
11 describe('Publisher', function () {
12 let publisher, logger, options, name;
13 const expectedException = new Error('oh no');
14
15 beforeEach(function () {
16 logger = new StubLogger()
17 logger._reset();
18 options = {
19 amqp: {
20 url: 'amqp://user:password@rabbitmq.int:5672',
21 },
22 };
23 sinon.stub(amqp, 'connect').resolves({
24 createConfirmChannel: sinon.stub().resolves({
25 on: sinon.stub(),
26 checkQueue: sinon.stub(),
27 assertExchange: sinon.stub(),
28 assertQueue: sinon.stub(),
29 bindQueue: sinon.stub(),
30 prefetch: sinon.stub(),
31 consume: sinon.stub(),
32 recover: sinon.stub(),
33 close: sinon.stub(),
34 }),
35 connection: {
36 stream: {
37 writable: true,
38 },
39 },
40 on: sinon.stub(),
41 close: sinon.stub(),
42 });
43 publisher = new Publisher(logger, options);
44 publisher.channel = {
45 publish: sinon.stub().returns(true),
46 recover: sinon.stub(),
47 close: sinon.stub(),
48 };
49 name = 'name';
50 });
51
52 function publishSuccess(_exchange, _routing, _content, _options, cb) {
53 cb(null, {});
54 return true;
55 }
56
57 function publishFailure(_exchange, _routing, _content, _options, cb) {
58 cb(expectedException, null);
59 return true;
60 }
61
62 function publishSuccessFull(_exchange, _routing, _content, _options, cb) {
63 cb(null, {});
64 return false;
65 }
66
67 function publishFailureFull(_exchange, _routing, _content, _options, cb) {
68 cb(expectedException, null);
69 return false;
70 }
71
72 afterEach(function () {
73 sinon.restore();
74 });
75
76 describe('publish', function () {
77 let content;
78 beforeEach(function () {
79 content = Buffer.from('test');
80 });
81 it('covers success', async function () {
82 publisher.channel.publish = sinon.spy(publishSuccess);
83 await publisher.publish(name, content);
84 assert.strictEqual(publisher.keepSending, true);
85 });
86 it('covers failure', async function () {
87 publisher.channel.publish = sinon.spy(publishFailure);
88 await assert.rejects(publisher.publish(name, content), expectedException);
89 assert.strictEqual(publisher.keepSending, true);
90 });
91 it('covers success, stream full', async function () {
92 publisher.channel.publish = sinon.spy(publishSuccessFull);
93 await publisher.publish(name, content);
94 assert.strictEqual(publisher.keepSending, false);
95 });
96 it('covers failure, stream full', async function () {
97 publisher.channel.publish = sinon.spy(publishFailureFull);
98 await assert.rejects(publisher.publish(name, content), expectedException);
99 assert.strictEqual(publisher.keepSending, false);
100 });
101 it('covers non-buffer content', async function () {
102 content = 'text';
103 publisher.channel.publish = sinon.spy(publishSuccess);
104 await publisher.publish(name, content);
105 assert.strictEqual(publisher.keepSending, true);
106 });
107 it('covers object content', async function () {
108 content = { foo: 'bar' };
109 publisher.channel.publish = sinon.spy(publishSuccess);
110 await publisher.publish(name, content);
111 assert.strictEqual(publisher.keepSending, true);
112 });
113 it('queues content when stream full', async function () {
114 publisher.keepSending = false;
115 publisher.channel.publish = sinon.spy(publishSuccess);
116 await publisher.publish(name, content);
117 assert.strictEqual(publisher.keepSending, false);
118 assert.strictEqual(publisher.drainQueue.length, 1);
119 });
120 }); // publish
121
122 describe('_eventAMQPChannelDrain', function () {
123 beforeEach(function () {
124 sinon.stub(publisher, '_publishDrainQueue');
125 });
126 it('resets keepSending and tries to drain the queue', function () {
127 publisher._eventAMQPChannelDrain();
128 assert.strictEqual(publisher.keepSending, true);
129 assert(publisher._publishDrainQueue.called);
130 });
131 }); // _eventAMQPChannelDrain
132
133 describe('_publishDrainQueue', function () {
134 it('sends queued contents', function () {
135 publisher.drainQueue.push({ content: Buffer.from('xyz'), options: {} });
136 publisher.channel.publish = sinon.spy(publishSuccess);
137 publisher._publishDrainQueue();
138 assert(publisher.channel.publish.called);
139 assert.strictEqual(publisher.drainQueue.length, 0);
140 });
141 it('covers publish failure', function () {
142 publisher.drainQueue.push({ content: Buffer.from('xyz'), options: {} });
143 publisher.channel.publish = sinon.spy(publishFailure);
144 publisher._publishDrainQueue();
145 assert.strictEqual(publisher.drainQueue.length, 0);
146 });
147 }); // _publishDrainQueue
148
149 describe('establishAMQPPlumbing', function () {
150 beforeEach(async function () {
151 await publisher.connect();
152 sinon.stub(publisher, 'close');
153 });
154 it('covers success', async function () {
155 await publisher.establishAMQPPlumbing();
156 assert(publisher.channel.assertExchange.called);
157 });
158 it('covers failure', async function () {
159 publisher.channel.assertExchange.rejects(expectedException);
160 await assert.rejects(publisher.establishAMQPPlumbing(), expectedException);
161 assert(publisher.close.called);
162 });
163 }); // establishAMQPPlumbing
164
165 }); // Publisher