initial commit
[squeep-amqp-helper] / lib / publisher.js
diff --git a/lib/publisher.js b/lib/publisher.js
new file mode 100644 (file)
index 0000000..453b575
--- /dev/null
@@ -0,0 +1,113 @@
+'use strict';
+const Base = require('./base');
+const common = require('./common');
+
+const _fileScope = common.fileScope(__filename);
+
+class Publisher extends Base {
+  constructor(logger, options) {
+    super(logger, options);
+
+    this.keepSending = true;
+    this.drainQueue = [];
+  }
+
+  static publishDefaults = {
+    persistent: true,
+  };
+
+  /**
+   * Send a message to an exchange.
+   * @param {String} name
+   * @param {*} content
+   * @param {Object} options
+   */
+  async publish(name, content, options) {
+    const _scope = _fileScope('publish');
+
+    const exchangeName = this._exchangeName(name);
+  
+    if (!(content instanceof Buffer)) {
+      content = Buffer.from(typeof(content) === 'object' ? JSON.stringify(content) : content);
+    }
+
+    const timestamp = (new Date()).getTime();
+    options = Object.assign({ timestamp }, Publisher.publishDefaults, options);
+
+    return new Promise((resolve, reject) => {
+      if (!this.keepSending) {
+        this.drainQueue.push({ exchangeName, content, options });
+        this.logger.debug(_scope, 'queued until drain', { exchangeName, queueLength: this.drainQueue.length });
+        return resolve(false);
+      }
+      this.keepSending = this.channel.publish(this._exchangeName(name), '', content, options, (err, ok) => {
+        if (err) {
+          this.logger.error(_scope, 'channel publish failed', { exchangeName, err });
+          return reject(err);
+        }
+        this.logger.debug(_scope, 'success', { exchangeName, options, ok });
+        return resolve(ok);
+      });
+    });
+  }
+
+  /**
+   * Resume publishing after channel has drained, sending any pending
+   * messages.
+   */
+  _eventAMQPChannelDrain() {
+    const _scope = _fileScope('_eventChannelDrain');
+    this.logger.debug(_scope, 'channel drain event', { queueLength: this.drainQueue.length });
+    this.keepSending = true;
+    this._publishDrainQueue();
+  }
+
+  /**
+   * Attempt to send any pending messages, similar to publish but without
+   * async behavior.
+   */
+  _publishDrainQueue() {
+    const _scope = _fileScope('publishDrainQueue');
+    while (this.keepSending && this.drainQueue.length) {
+      const { exchangeName, content, options } = this.drainQueue.shift();
+      this.keepSending = this.channel.publish(exchangeName, '', content, options, (err, ok) => {
+        if (err) {
+          this.logger.error(_scope, 'channel publish failed', { err, exchangeName, options });
+          return;
+        }
+        this.logger.debug(_scope, 'success', { exchangeName, options, ok });
+      });
+    }
+  }
+
+  /**
+   * Producer only needs to plumb the exchange it sends to.
+   * 
+   * @param {String} name
+   */
+  async establishAMQPPlumbing(name) {
+    const _scope = _fileScope('establishAMQPPlumbing');
+
+    const exchangeOptions = {
+      durable: true, // Exchanges are permanent
+    };
+
+    const exchangeName = this._exchangeName(name);
+
+    this.logger.debug(_scope, 'plumbing', { exchangeName });
+
+    try {
+      const [ exchange ] = await Promise.all([
+        this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
+      ]);
+      this.logger.debug(_scope, 'plumbing asserted', { exchange });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, name });
+      await this.close();
+      throw e;
+    }
+  }
+  
+}
+
+module.exports = Publisher;
\ No newline at end of file