2 const Base
= require('./base');
3 const { fileScope
} = require('@squeep/log-helper');
5 const _fileScope
= fileScope(__filename
);
7 class Publisher
extends Base
{
8 constructor(logger
, options
) {
9 super(logger
, options
);
11 this.keepSending
= true;
15 static publishDefaults
= {
20 * @typedef {object} PublishOptions
21 * @property {number} timestamp timestamp in ms, defaults to current
22 * @property {boolean} persistent defaults to true
25 * Send a message to an exchange.
26 * @param {string} name name
27 * @param {*} content message content
28 * @param {PublishOptions} options options
29 * @returns {Promise<void>}
31 async
publish(name
, content
, options
) {
32 const _scope
= _fileScope('publish');
34 const exchangeName
= this._exchangeName(name
);
36 if (!(content
instanceof Buffer
)) {
37 content
= Buffer
.from(typeof(content
) === 'object' ? JSON
.stringify(content
) : content
);
40 const timestamp
= Date
.now();
43 ...Publisher
.publishDefaults
,
47 return new Promise((resolve
, reject
) => {
48 if (!this.keepSending
) {
49 this.drainQueue
.push({ exchangeName
, content
, options
});
50 this.logger
.debug(_scope
, 'queued until drain', { exchangeName
, queueLength: this.drainQueue
.length
});
51 return resolve(false);
53 this.keepSending
= this.channel
.publish(this._exchangeName(name
), '', content
, options
, (err
, ok
) => {
55 this.logger
.error(_scope
, 'channel publish failed', { exchangeName
, err
});
58 this.logger
.debug(_scope
, 'success', { exchangeName
, options
, ok
});
65 * Resume publishing after channel has drained, sending any pending
68 _eventAMQPChannelDrain() {
69 const _scope
= _fileScope('_eventChannelDrain');
70 this.logger
.debug(_scope
, 'channel drain event', { queueLength: this.drainQueue
.length
});
71 this.keepSending
= true;
72 this._publishDrainQueue();
76 * Attempt to send any pending messages, similar to publish but without
79 _publishDrainQueue() {
80 const _scope
= _fileScope('publishDrainQueue');
81 while (this.keepSending
&& this.drainQueue
.length
) {
82 const { exchangeName
, content
, options
} = this.drainQueue
.shift();
83 this.keepSending
= this.channel
.publish(exchangeName
, '', content
, options
, (err
, ok
) => {
85 this.logger
.error(_scope
, 'channel publish failed', { err
, exchangeName
, options
});
88 this.logger
.debug(_scope
, 'success', { exchangeName
, options
, ok
});
94 * Producer only needs to plumb the exchange it sends to.
95 * @param {string} name name
97 async
establishAMQPPlumbing(name
) {
98 const _scope
= _fileScope('establishAMQPPlumbing');
100 const exchangeOptions
= {
101 durable: true, // Exchanges are permanent
104 const exchangeName
= this._exchangeName(name
);
106 this.logger
.debug(_scope
, 'plumbing', { exchangeName
});
109 const [ exchange
] = await Promise
.all([
110 this.channel
.assertExchange(exchangeName
, this.options
.exchangeType
, exchangeOptions
),
112 this.logger
.debug(_scope
, 'plumbing asserted', { exchange
});
114 this.logger
.error(_scope
, 'failed', { error: e
, name
});
122 module
.exports
= Publisher
;