--- /dev/null
+'use strict';
+const Base = require('./base');
+const common = require('./common');
+
+const _fileScope = common.fileScope(__filename);
+
+class Publisher extends Base {
+ constructor(logger, options) {
+ super(logger, options);
+
+ this.keepSending = true;
+ this.drainQueue = [];
+ }
+
+ static publishDefaults = {
+ persistent: true,
+ };
+
+ /**
+ * Send a message to an exchange.
+ * @param {String} name
+ * @param {*} content
+ * @param {Object} options
+ */
+ async publish(name, content, options) {
+ const _scope = _fileScope('publish');
+
+ const exchangeName = this._exchangeName(name);
+
+ if (!(content instanceof Buffer)) {
+ content = Buffer.from(typeof(content) === 'object' ? JSON.stringify(content) : content);
+ }
+
+ const timestamp = (new Date()).getTime();
+ options = Object.assign({ timestamp }, Publisher.publishDefaults, options);
+
+ return new Promise((resolve, reject) => {
+ if (!this.keepSending) {
+ this.drainQueue.push({ exchangeName, content, options });
+ this.logger.debug(_scope, 'queued until drain', { exchangeName, queueLength: this.drainQueue.length });
+ return resolve(false);
+ }
+ this.keepSending = this.channel.publish(this._exchangeName(name), '', content, options, (err, ok) => {
+ if (err) {
+ this.logger.error(_scope, 'channel publish failed', { exchangeName, err });
+ return reject(err);
+ }
+ this.logger.debug(_scope, 'success', { exchangeName, options, ok });
+ return resolve(ok);
+ });
+ });
+ }
+
+ /**
+ * Resume publishing after channel has drained, sending any pending
+ * messages.
+ */
+ _eventAMQPChannelDrain() {
+ const _scope = _fileScope('_eventChannelDrain');
+ this.logger.debug(_scope, 'channel drain event', { queueLength: this.drainQueue.length });
+ this.keepSending = true;
+ this._publishDrainQueue();
+ }
+
+ /**
+ * Attempt to send any pending messages, similar to publish but without
+ * async behavior.
+ */
+ _publishDrainQueue() {
+ const _scope = _fileScope('publishDrainQueue');
+ while (this.keepSending && this.drainQueue.length) {
+ const { exchangeName, content, options } = this.drainQueue.shift();
+ this.keepSending = this.channel.publish(exchangeName, '', content, options, (err, ok) => {
+ if (err) {
+ this.logger.error(_scope, 'channel publish failed', { err, exchangeName, options });
+ return;
+ }
+ this.logger.debug(_scope, 'success', { exchangeName, options, ok });
+ });
+ }
+ }
+
+ /**
+ * Producer only needs to plumb the exchange it sends to.
+ *
+ * @param {String} name
+ */
+ async establishAMQPPlumbing(name) {
+ const _scope = _fileScope('establishAMQPPlumbing');
+
+ const exchangeOptions = {
+ durable: true, // Exchanges are permanent
+ };
+
+ const exchangeName = this._exchangeName(name);
+
+ this.logger.debug(_scope, 'plumbing', { exchangeName });
+
+ try {
+ const [ exchange ] = await Promise.all([
+ this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
+ ]);
+ this.logger.debug(_scope, 'plumbing asserted', { exchange });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, name });
+ await this.close();
+ throw e;
+ }
+ }
+
+}
+
+module.exports = Publisher;
\ No newline at end of file