initial commit
[squeep-amqp-helper] / lib / base.js
diff --git a/lib/base.js b/lib/base.js
new file mode 100644 (file)
index 0000000..87607da
--- /dev/null
@@ -0,0 +1,300 @@
+'use strict';
+const amqp = require('amqplib');
+const { isFatalError: isFatalAMQPError } = require('amqplib/lib/connection');
+const common = require('./common');
+
+const _fileScope = common.fileScope(__filename);
+
+/**
+ * Base class common to worker publisher and consumer, handling
+ * the AMQP setup.
+ */
+
+class Base {
+  /**
+   * @param {Console} logger
+   * @param {Object} options
+   * @param {String} options.url
+   * @param {Number=} options.prefetch
+   * @param {Object=} options.socketOptions
+   * @param {Boolean=} options.socketOptions.noDelay
+   * @param {Number=} options.socketOptions.timeout
+   * @param {Boolean=} options.socketOptions.keepAlive
+   * @param {Number=} options.socketOptions.keepAliveDelay
+   * @param {Object=} options.socketOptions.clientProperties
+   */
+  constructor(logger, options) {
+    this.logger = logger;
+    this.options = Object.assign({
+      url: undefined,
+      name: 'messages',
+      prefix: 'squeep',
+      queueSuffix: 'queue',
+      retrySuffix: 'retry',
+      exchangeType: 'direct',
+      queueType: 'quorum',
+      retryDelayMs: 10000,
+      prefetch: 1,
+    }, options);
+    this.options.socketOptions = Object.assign({
+      noDelay: undefined,
+      timeout: undefined,
+      keepAlive: undefined,
+      keepAliveDelay: undefined,
+      clientProperties: undefined,
+    }, options.socketOptions);
+
+    this.connection = undefined;
+    this.channel = undefined;
+  }
+
+
+  /**
+   * Establish the necessary connections to the queue and lock services.
+   */
+  async connect() {
+    const _scope = _fileScope('connect');
+    this.logger.debug(_scope, 'called');
+
+    try {
+      await this._connectAMQP();
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      await this.close();
+      throw e;
+    }
+  }
+
+  async _connectAMQP() {
+    await this._establishAMQPConnection();
+    await this._establishAMQPChannel();
+  }
+
+  async _establishAMQPConnection() {
+    const _scope = _fileScope('_establishConnection');
+    const { url, socketOptions } = this.options;
+    try {
+      this.connection = await amqp.connect(url, socketOptions);
+      this.connection.on('close', this._eventAMQPConnectionClose.bind(this));
+      this.connection.on('error', this._eventAMQPConnectionError.bind(this));
+      this.connection.on('blocked', this._eventAMQPConnectionBlocked.bind(this));
+      this.connection.on('unblocked', this._eventAMQPConnectionUnblocked.bind(this));
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      throw e;
+    }
+  }
+
+  _eventAMQPConnectionClose(err) {
+    const _scope = _fileScope('_eventConnectionClose');
+    this.connection = undefined;
+    this.channel = undefined;
+    const isFatal = isFatalAMQPError(err);
+    const _log = isFatal ? this.logger.error : this.logger.debug;
+    _log(_scope, 'connection close event', { err });
+  }
+
+  _eventAMQPConnectionError(err) {
+    const _scope = _fileScope('_eventConnectionError');
+    this.logger.error(_scope, 'connection error event', { err });
+  }
+
+  _eventAMQPConnectionBlocked(reason) {
+    const _scope = _fileScope('_eventAMQPConnectionBlocked');
+    this.logger.debug(_scope, 'connection blocked event', { reason });
+  }
+
+  _eventAMQPConnectionUnblocked() {
+    const _scope = _fileScope('_eventAMQPConnectionUnblocked');
+    this.logger.debug(_scope, 'connection unblocked event');
+  }
+
+  async _establishAMQPChannel() {
+    const _scope = _fileScope('_establishChannel');
+    try {
+      this.channel = await this.connection.createConfirmChannel();
+      this.channel.on('close', this._eventAMQPChannelClose.bind(this));
+      this.channel.on('error', this._eventAMQPChannelError.bind(this));
+      this.channel.on('return', this._eventAMQPChannelReturn.bind(this));
+      this.channel.on('drain', this._eventAMQPChannelDrain.bind(this));  
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      throw e;
+    }
+  }
+
+  _eventAMQPChannelClose(err) {
+    const _scope = _fileScope('_eventChannelClose');
+    this.channel = undefined;
+    const isFatal = err && isFatalAMQPError(err);
+    const _log = isFatal ? this.logger.error : this.logger.debug;
+    _log(_scope, 'channel close event', { err });
+  }
+
+  _eventAMQPChannelError(err) {
+    const _scope = _fileScope('_eventChannelError');
+    this.logger.error(_scope, 'channel error event', { err });
+  }
+
+  _eventAMQPChannelReturn(msg) {
+    const _scope = _fileScope('_eventChannelReturn');
+    this.logger.error(_scope, 'channel return event', { msg });
+  }
+
+  _eventAMQPChannelDrain() {
+    const _scope = _fileScope('_eventChannelDrain');
+    this.logger.debug(_scope, 'channel drain event');
+  }
+
+  _exchangeName(name) {
+    return [
+      ...(this.options.prefix && [this.options.prefix] || []),
+      name,
+    ].join('.');
+  }
+
+  _retryExchangeName(name) {
+    return [
+      ...(this.options.prefix && [this.options.prefix] || []),
+      name,
+      this.options.retrySuffix,
+    ].join('.');
+  }
+
+  _queueName(name) {
+    return [
+      ...(this.options.prefix && [this.options.prefix] || []),
+      name,
+      this.options.queueSuffix,
+    ].join('.');
+  }
+
+  _retryQueueName(name) {
+    return [
+      ...(this.options.prefix && [this.options.prefix] || []),
+      name,
+      this.options.retrySuffix,
+      this.options.queueSuffix,
+    ].join('.');
+  }
+
+  _retryQueuePolicyName(name) {
+    return [
+      ...(this.options.prefix && [this.options.prefix] || []),
+      ...((name || '').split('.').map(() => '.*')),
+      this.options.retrySuffix,
+      this.options.queueSuffix,
+    ].join('\\.');
+  }
+
+  /**
+   * Note:
+   * - RabbitMQ does not currently support creating quorum queues with message-ttl.
+   * - amqplib does not provide an API to set a ttl policy on the retry queue
+   * @see {@link policyCommand}
+   * 
+   * @param {String} name
+   */
+  async establishAMQPPlumbing(name) {
+    const _scope = _fileScope('establishAMQPPlumbing');
+
+    const exchangeOptions = {
+      durable: true, // Exchanges are permanent
+    };
+
+    const queueOptions = {
+      arguments: {
+        'x-queue-type': this.options.queueType, // quorum for fault-tolerance and data-safety
+        'x-dead-letter-exchange': this._retryExchangeName(name),
+      },
+      durable: true, // Queues are permanent
+      noAck: false, // Queues expect explicit acknowledgement of delivery
+    };
+
+    const queueRetryOptions = {
+      arguments: {
+        'x-queue-type': this.options.queueType,
+        'x-dead-letter-exchange': this._exchangeName(name),
+        // 'x-message-ttl': workerRetryDelayMs, // Cannot currently set on quorum queues, define a policy instead. (No api for that??)
+      },
+      durable: true,
+      noAck: false,
+    };
+
+    const exchangeName = this._exchangeName(name);
+    const retryExchangeName = this._retryExchangeName(name);
+    const queueName = this._queueName(name);
+    const retryQueueName = this._retryQueueName(name);
+
+    this.logger.debug(_scope, 'plumbing', { exchangeName, retryExchangeName, queueName, retryQueueName });
+
+    try {
+      const [ exchange, retryExchange, queue, retryQueue ] = await Promise.all([
+        this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
+        this.channel.assertExchange(retryExchangeName, this.options.exchangeType, exchangeOptions),
+        this.channel.assertQueue(queueName, queueOptions),
+        this.channel.assertQueue(retryQueueName, queueRetryOptions),
+      ]);
+      const [ queueBinding, retryQueueBinding ] = await Promise.all([
+        this.channel.bindQueue(queueName, exchangeName, ''),
+        this.channel.bindQueue(retryQueueName, retryExchangeName, ''),
+      ]);
+      this.logger.debug(_scope, 'plumbing asserted', { exchange, retryExchange, queue, retryQueue, queueBinding, retryQueueBinding });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, name });
+      await this.close();
+      throw e;
+    }
+  }
+
+  /**
+   * Shutdown the channel and connection.
+   */
+  async close() {
+    const _scope = _fileScope('close');
+    this.logger.debug(_scope, 'called');
+
+    try {
+      if (this.channel) {
+        await this.channel.recover();
+        await this.channel.close();
+      }
+
+      if (this.connection) {
+        await this.connection.close();
+      }
+      this.logger.debug(_scope, 'closed');
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      throw e;
+    }
+  }
+
+  /**
+   * Minimal health-check, connection is writable.
+   * @returns {Boolean}
+   */
+  health() {
+    return !!this?.connection?.connection?.stream?.writable;
+  }
+
+  /**
+   * Generate an example cli command to create a retry-queue policy.
+   */
+  policyCommand(name) {
+    const settings = {
+      'message-ttl': this.options.retryDelayMs,
+    };
+    return [
+      'rabbitmqctl',
+      'set_policy',
+      'RetryQueueTTL',
+      `'${this._retryQueuePolicyName(name)}'`,
+      `'${JSON.stringify(settings)}'`,
+      '--apply-to queues',
+    ].join(' ');
+  }
+
+}
+
+module.exports = Base;
\ No newline at end of file