2 const Base
= require('./base');
3 const { fileScope
} = require('@squeep/log-helper');
5 const _fileScope
= fileScope(__filename
);
7 class Publisher
extends Base
{
8 constructor(logger
, options
) {
9 super(logger
, options
);
11 this.keepSending
= true;
15 static publishDefaults
= {
20 * Send a message to an exchange.
21 * @param {String} name
23 * @param {Object} options
25 async
publish(name
, content
, options
) {
26 const _scope
= _fileScope('publish');
28 const exchangeName
= this._exchangeName(name
);
30 if (!(content
instanceof Buffer
)) {
31 content
= Buffer
.from(typeof(content
) === 'object' ? JSON
.stringify(content
) : content
);
34 const timestamp
= (new Date()).getTime();
37 ...Publisher
.publishDefaults
,
41 return new Promise((resolve
, reject
) => {
42 if (!this.keepSending
) {
43 this.drainQueue
.push({ exchangeName
, content
, options
});
44 this.logger
.debug(_scope
, 'queued until drain', { exchangeName
, queueLength: this.drainQueue
.length
});
45 return resolve(false);
47 this.keepSending
= this.channel
.publish(this._exchangeName(name
), '', content
, options
, (err
, ok
) => {
49 this.logger
.error(_scope
, 'channel publish failed', { exchangeName
, err
});
52 this.logger
.debug(_scope
, 'success', { exchangeName
, options
, ok
});
59 * Resume publishing after channel has drained, sending any pending
62 _eventAMQPChannelDrain() {
63 const _scope
= _fileScope('_eventChannelDrain');
64 this.logger
.debug(_scope
, 'channel drain event', { queueLength: this.drainQueue
.length
});
65 this.keepSending
= true;
66 this._publishDrainQueue();
70 * Attempt to send any pending messages, similar to publish but without
73 _publishDrainQueue() {
74 const _scope
= _fileScope('publishDrainQueue');
75 while (this.keepSending
&& this.drainQueue
.length
) {
76 const { exchangeName
, content
, options
} = this.drainQueue
.shift();
77 this.keepSending
= this.channel
.publish(exchangeName
, '', content
, options
, (err
, ok
) => {
79 this.logger
.error(_scope
, 'channel publish failed', { err
, exchangeName
, options
});
82 this.logger
.debug(_scope
, 'success', { exchangeName
, options
, ok
});
88 * Producer only needs to plumb the exchange it sends to.
90 * @param {String} name
92 async
establishAMQPPlumbing(name
) {
93 const _scope
= _fileScope('establishAMQPPlumbing');
95 const exchangeOptions
= {
96 durable: true, // Exchanges are permanent
99 const exchangeName
= this._exchangeName(name
);
101 this.logger
.debug(_scope
, 'plumbing', { exchangeName
});
104 const [ exchange
] = await Promise
.all([
105 this.channel
.assertExchange(exchangeName
, this.options
.exchangeType
, exchangeOptions
),
107 this.logger
.debug(_scope
, 'plumbing asserted', { exchange
});
109 this.logger
.error(_scope
, 'failed', { error: e
, name
});
117 module
.exports
= Publisher
;