2 /* eslint-disable node/no-unpublished-require */
5 const assert
= require('assert');
6 const sinon
= require('sinon');
7 const amqp
= require('amqplib');
8 const Publisher
= require('../../lib/publisher');
9 const { StubLogger
} = require('@squeep/test-helper');
11 describe('Publisher', function () {
12 let publisher
, logger
, options
, name
;
13 const expectedException
= new Error('oh no');
15 beforeEach(function () {
16 logger
= new StubLogger()
20 url: 'amqp://user:password@rabbitmq.int:5672',
23 sinon
.stub(amqp
, 'connect').resolves({
24 createConfirmChannel: sinon
.stub().resolves({
26 checkQueue: sinon
.stub(),
27 assertExchange: sinon
.stub(),
28 assertQueue: sinon
.stub(),
29 bindQueue: sinon
.stub(),
30 prefetch: sinon
.stub(),
31 consume: sinon
.stub(),
32 recover: sinon
.stub(),
43 publisher
= new Publisher(logger
, options
);
45 publish: sinon
.stub().returns(true),
46 recover: sinon
.stub(),
52 function publishSuccess(_exchange
, _routing
, _content
, _options
, cb
) {
57 function publishFailure(_exchange
, _routing
, _content
, _options
, cb
) {
58 cb(expectedException
, null);
62 function publishSuccessFull(_exchange
, _routing
, _content
, _options
, cb
) {
67 function publishFailureFull(_exchange
, _routing
, _content
, _options
, cb
) {
68 cb(expectedException
, null);
72 afterEach(function () {
76 describe('publish', function () {
78 beforeEach(function () {
79 content
= Buffer
.from('test');
81 it('covers success', async
function () {
82 publisher
.channel
.publish
= sinon
.spy(publishSuccess
);
83 await publisher
.publish(name
, content
);
84 assert
.strictEqual(publisher
.keepSending
, true);
86 it('covers failure', async
function () {
87 publisher
.channel
.publish
= sinon
.spy(publishFailure
);
88 await assert
.rejects(publisher
.publish(name
, content
), expectedException
);
89 assert
.strictEqual(publisher
.keepSending
, true);
91 it('covers success, stream full', async
function () {
92 publisher
.channel
.publish
= sinon
.spy(publishSuccessFull
);
93 await publisher
.publish(name
, content
);
94 assert
.strictEqual(publisher
.keepSending
, false);
96 it('covers failure, stream full', async
function () {
97 publisher
.channel
.publish
= sinon
.spy(publishFailureFull
);
98 await assert
.rejects(publisher
.publish(name
, content
), expectedException
);
99 assert
.strictEqual(publisher
.keepSending
, false);
101 it('covers non-buffer content', async
function () {
103 publisher
.channel
.publish
= sinon
.spy(publishSuccess
);
104 await publisher
.publish(name
, content
);
105 assert
.strictEqual(publisher
.keepSending
, true);
107 it('covers object content', async
function () {
108 content
= { foo: 'bar' };
109 publisher
.channel
.publish
= sinon
.spy(publishSuccess
);
110 await publisher
.publish(name
, content
);
111 assert
.strictEqual(publisher
.keepSending
, true);
113 it('queues content when stream full', async
function () {
114 publisher
.keepSending
= false;
115 publisher
.channel
.publish
= sinon
.spy(publishSuccess
);
116 await publisher
.publish(name
, content
);
117 assert
.strictEqual(publisher
.keepSending
, false);
118 assert
.strictEqual(publisher
.drainQueue
.length
, 1);
122 describe('_eventAMQPChannelDrain', function () {
123 beforeEach(function () {
124 sinon
.stub(publisher
, '_publishDrainQueue');
126 it('resets keepSending and tries to drain the queue', function () {
127 publisher
._eventAMQPChannelDrain();
128 assert
.strictEqual(publisher
.keepSending
, true);
129 assert(publisher
._publishDrainQueue
.called
);
131 }); // _eventAMQPChannelDrain
133 describe('_publishDrainQueue', function () {
134 it('sends queued contents', function () {
135 publisher
.drainQueue
.push({ content: Buffer
.from('xyz'), options: {} });
136 publisher
.channel
.publish
= sinon
.spy(publishSuccess
);
137 publisher
._publishDrainQueue();
138 assert(publisher
.channel
.publish
.called
);
139 assert
.strictEqual(publisher
.drainQueue
.length
, 0);
141 it('covers publish failure', function () {
142 publisher
.drainQueue
.push({ content: Buffer
.from('xyz'), options: {} });
143 publisher
.channel
.publish
= sinon
.spy(publishFailure
);
144 publisher
._publishDrainQueue();
145 assert
.strictEqual(publisher
.drainQueue
.length
, 0);
147 }); // _publishDrainQueue
149 describe('establishAMQPPlumbing', function () {
150 beforeEach(async
function () {
151 await publisher
.connect();
152 sinon
.stub(publisher
, 'close');
154 it('covers success', async
function () {
155 await publisher
.establishAMQPPlumbing();
156 assert(publisher
.channel
.assertExchange
.called
);
158 it('covers failure', async
function () {
159 publisher
.channel
.assertExchange
.rejects(expectedException
);
160 await assert
.rejects(publisher
.establishAMQPPlumbing(), expectedException
);
161 assert(publisher
.close
.called
);
163 }); // establishAMQPPlumbing