--- /dev/null
+'use strict';
+const amqp = require('amqplib');
+const { isFatalError: isFatalAMQPError } = require('amqplib/lib/connection');
+const common = require('./common');
+
+const _fileScope = common.fileScope(__filename);
+
+/**
+ * Base class common to worker publisher and consumer, handling
+ * the AMQP setup.
+ */
+
+class Base {
+ /**
+ * @param {Console} logger
+ * @param {Object} options
+ * @param {String} options.url
+ * @param {Number=} options.prefetch
+ * @param {Object=} options.socketOptions
+ * @param {Boolean=} options.socketOptions.noDelay
+ * @param {Number=} options.socketOptions.timeout
+ * @param {Boolean=} options.socketOptions.keepAlive
+ * @param {Number=} options.socketOptions.keepAliveDelay
+ * @param {Object=} options.socketOptions.clientProperties
+ */
+ constructor(logger, options) {
+ this.logger = logger;
+ this.options = Object.assign({
+ url: undefined,
+ name: 'messages',
+ prefix: 'squeep',
+ queueSuffix: 'queue',
+ retrySuffix: 'retry',
+ exchangeType: 'direct',
+ queueType: 'quorum',
+ retryDelayMs: 10000,
+ prefetch: 1,
+ }, options);
+ this.options.socketOptions = Object.assign({
+ noDelay: undefined,
+ timeout: undefined,
+ keepAlive: undefined,
+ keepAliveDelay: undefined,
+ clientProperties: undefined,
+ }, options.socketOptions);
+
+ this.connection = undefined;
+ this.channel = undefined;
+ }
+
+
+ /**
+ * Establish the necessary connections to the queue and lock services.
+ */
+ async connect() {
+ const _scope = _fileScope('connect');
+ this.logger.debug(_scope, 'called');
+
+ try {
+ await this._connectAMQP();
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ await this.close();
+ throw e;
+ }
+ }
+
+ async _connectAMQP() {
+ await this._establishAMQPConnection();
+ await this._establishAMQPChannel();
+ }
+
+ async _establishAMQPConnection() {
+ const _scope = _fileScope('_establishConnection');
+ const { url, socketOptions } = this.options;
+ try {
+ this.connection = await amqp.connect(url, socketOptions);
+ this.connection.on('close', this._eventAMQPConnectionClose.bind(this));
+ this.connection.on('error', this._eventAMQPConnectionError.bind(this));
+ this.connection.on('blocked', this._eventAMQPConnectionBlocked.bind(this));
+ this.connection.on('unblocked', this._eventAMQPConnectionUnblocked.bind(this));
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ throw e;
+ }
+ }
+
+ _eventAMQPConnectionClose(err) {
+ const _scope = _fileScope('_eventConnectionClose');
+ this.connection = undefined;
+ this.channel = undefined;
+ const isFatal = isFatalAMQPError(err);
+ const _log = isFatal ? this.logger.error : this.logger.debug;
+ _log(_scope, 'connection close event', { err });
+ }
+
+ _eventAMQPConnectionError(err) {
+ const _scope = _fileScope('_eventConnectionError');
+ this.logger.error(_scope, 'connection error event', { err });
+ }
+
+ _eventAMQPConnectionBlocked(reason) {
+ const _scope = _fileScope('_eventAMQPConnectionBlocked');
+ this.logger.debug(_scope, 'connection blocked event', { reason });
+ }
+
+ _eventAMQPConnectionUnblocked() {
+ const _scope = _fileScope('_eventAMQPConnectionUnblocked');
+ this.logger.debug(_scope, 'connection unblocked event');
+ }
+
+ async _establishAMQPChannel() {
+ const _scope = _fileScope('_establishChannel');
+ try {
+ this.channel = await this.connection.createConfirmChannel();
+ this.channel.on('close', this._eventAMQPChannelClose.bind(this));
+ this.channel.on('error', this._eventAMQPChannelError.bind(this));
+ this.channel.on('return', this._eventAMQPChannelReturn.bind(this));
+ this.channel.on('drain', this._eventAMQPChannelDrain.bind(this));
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ throw e;
+ }
+ }
+
+ _eventAMQPChannelClose(err) {
+ const _scope = _fileScope('_eventChannelClose');
+ this.channel = undefined;
+ const isFatal = err && isFatalAMQPError(err);
+ const _log = isFatal ? this.logger.error : this.logger.debug;
+ _log(_scope, 'channel close event', { err });
+ }
+
+ _eventAMQPChannelError(err) {
+ const _scope = _fileScope('_eventChannelError');
+ this.logger.error(_scope, 'channel error event', { err });
+ }
+
+ _eventAMQPChannelReturn(msg) {
+ const _scope = _fileScope('_eventChannelReturn');
+ this.logger.error(_scope, 'channel return event', { msg });
+ }
+
+ _eventAMQPChannelDrain() {
+ const _scope = _fileScope('_eventChannelDrain');
+ this.logger.debug(_scope, 'channel drain event');
+ }
+
+ _exchangeName(name) {
+ return [
+ ...(this.options.prefix && [this.options.prefix] || []),
+ name,
+ ].join('.');
+ }
+
+ _retryExchangeName(name) {
+ return [
+ ...(this.options.prefix && [this.options.prefix] || []),
+ name,
+ this.options.retrySuffix,
+ ].join('.');
+ }
+
+ _queueName(name) {
+ return [
+ ...(this.options.prefix && [this.options.prefix] || []),
+ name,
+ this.options.queueSuffix,
+ ].join('.');
+ }
+
+ _retryQueueName(name) {
+ return [
+ ...(this.options.prefix && [this.options.prefix] || []),
+ name,
+ this.options.retrySuffix,
+ this.options.queueSuffix,
+ ].join('.');
+ }
+
+ _retryQueuePolicyName(name) {
+ return [
+ ...(this.options.prefix && [this.options.prefix] || []),
+ ...((name || '').split('.').map(() => '.*')),
+ this.options.retrySuffix,
+ this.options.queueSuffix,
+ ].join('\\.');
+ }
+
+ /**
+ * Note:
+ * - RabbitMQ does not currently support creating quorum queues with message-ttl.
+ * - amqplib does not provide an API to set a ttl policy on the retry queue
+ * @see {@link policyCommand}
+ *
+ * @param {String} name
+ */
+ async establishAMQPPlumbing(name) {
+ const _scope = _fileScope('establishAMQPPlumbing');
+
+ const exchangeOptions = {
+ durable: true, // Exchanges are permanent
+ };
+
+ const queueOptions = {
+ arguments: {
+ 'x-queue-type': this.options.queueType, // quorum for fault-tolerance and data-safety
+ 'x-dead-letter-exchange': this._retryExchangeName(name),
+ },
+ durable: true, // Queues are permanent
+ noAck: false, // Queues expect explicit acknowledgement of delivery
+ };
+
+ const queueRetryOptions = {
+ arguments: {
+ 'x-queue-type': this.options.queueType,
+ 'x-dead-letter-exchange': this._exchangeName(name),
+ // 'x-message-ttl': workerRetryDelayMs, // Cannot currently set on quorum queues, define a policy instead. (No api for that??)
+ },
+ durable: true,
+ noAck: false,
+ };
+
+ const exchangeName = this._exchangeName(name);
+ const retryExchangeName = this._retryExchangeName(name);
+ const queueName = this._queueName(name);
+ const retryQueueName = this._retryQueueName(name);
+
+ this.logger.debug(_scope, 'plumbing', { exchangeName, retryExchangeName, queueName, retryQueueName });
+
+ try {
+ const [ exchange, retryExchange, queue, retryQueue ] = await Promise.all([
+ this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
+ this.channel.assertExchange(retryExchangeName, this.options.exchangeType, exchangeOptions),
+ this.channel.assertQueue(queueName, queueOptions),
+ this.channel.assertQueue(retryQueueName, queueRetryOptions),
+ ]);
+ const [ queueBinding, retryQueueBinding ] = await Promise.all([
+ this.channel.bindQueue(queueName, exchangeName, ''),
+ this.channel.bindQueue(retryQueueName, retryExchangeName, ''),
+ ]);
+ this.logger.debug(_scope, 'plumbing asserted', { exchange, retryExchange, queue, retryQueue, queueBinding, retryQueueBinding });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, name });
+ await this.close();
+ throw e;
+ }
+ }
+
+ /**
+ * Shutdown the channel and connection.
+ */
+ async close() {
+ const _scope = _fileScope('close');
+ this.logger.debug(_scope, 'called');
+
+ try {
+ if (this.channel) {
+ await this.channel.recover();
+ await this.channel.close();
+ }
+
+ if (this.connection) {
+ await this.connection.close();
+ }
+ this.logger.debug(_scope, 'closed');
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ throw e;
+ }
+ }
+
+ /**
+ * Minimal health-check, connection is writable.
+ * @returns {Boolean}
+ */
+ health() {
+ return !!this?.connection?.connection?.stream?.writable;
+ }
+
+ /**
+ * Generate an example cli command to create a retry-queue policy.
+ */
+ policyCommand(name) {
+ const settings = {
+ 'message-ttl': this.options.retryDelayMs,
+ };
+ return [
+ 'rabbitmqctl',
+ 'set_policy',
+ 'RetryQueueTTL',
+ `'${this._retryQueuePolicyName(name)}'`,
+ `'${JSON.stringify(settings)}'`,
+ '--apply-to queues',
+ ].join(' ');
+ }
+
+}
+
+module.exports = Base;
\ No newline at end of file