2 const Base
= require('./base');
3 const common
= require('./common');
5 const _fileScope
= common
.fileScope(__filename
);
7 class Publisher
extends Base
{
8 constructor(logger
, options
) {
9 super(logger
, options
);
11 this.keepSending
= true;
15 static publishDefaults
= {
20 * Send a message to an exchange.
21 * @param {String} name
23 * @param {Object} options
25 async
publish(name
, content
, options
) {
26 const _scope
= _fileScope('publish');
28 const exchangeName
= this._exchangeName(name
);
30 if (!(content
instanceof Buffer
)) {
31 content
= Buffer
.from(typeof(content
) === 'object' ? JSON
.stringify(content
) : content
);
34 const timestamp
= (new Date()).getTime();
35 options
= Object
.assign({ timestamp
}, Publisher
.publishDefaults
, options
);
37 return new Promise((resolve
, reject
) => {
38 if (!this.keepSending
) {
39 this.drainQueue
.push({ exchangeName
, content
, options
});
40 this.logger
.debug(_scope
, 'queued until drain', { exchangeName
, queueLength: this.drainQueue
.length
});
41 return resolve(false);
43 this.keepSending
= this.channel
.publish(this._exchangeName(name
), '', content
, options
, (err
, ok
) => {
45 this.logger
.error(_scope
, 'channel publish failed', { exchangeName
, err
});
48 this.logger
.debug(_scope
, 'success', { exchangeName
, options
, ok
});
55 * Resume publishing after channel has drained, sending any pending
58 _eventAMQPChannelDrain() {
59 const _scope
= _fileScope('_eventChannelDrain');
60 this.logger
.debug(_scope
, 'channel drain event', { queueLength: this.drainQueue
.length
});
61 this.keepSending
= true;
62 this._publishDrainQueue();
66 * Attempt to send any pending messages, similar to publish but without
69 _publishDrainQueue() {
70 const _scope
= _fileScope('publishDrainQueue');
71 while (this.keepSending
&& this.drainQueue
.length
) {
72 const { exchangeName
, content
, options
} = this.drainQueue
.shift();
73 this.keepSending
= this.channel
.publish(exchangeName
, '', content
, options
, (err
, ok
) => {
75 this.logger
.error(_scope
, 'channel publish failed', { err
, exchangeName
, options
});
78 this.logger
.debug(_scope
, 'success', { exchangeName
, options
, ok
});
84 * Producer only needs to plumb the exchange it sends to.
86 * @param {String} name
88 async
establishAMQPPlumbing(name
) {
89 const _scope
= _fileScope('establishAMQPPlumbing');
91 const exchangeOptions
= {
92 durable: true, // Exchanges are permanent
95 const exchangeName
= this._exchangeName(name
);
97 this.logger
.debug(_scope
, 'plumbing', { exchangeName
});
100 const [ exchange
] = await Promise
.all([
101 this.channel
.assertExchange(exchangeName
, this.options
.exchangeType
, exchangeOptions
),
103 this.logger
.debug(_scope
, 'plumbing asserted', { exchange
});
105 this.logger
.error(_scope
, 'failed', { error: e
, name
});
113 module
.exports
= Publisher
;