X-Git-Url: http://git.squeep.com/?p=squeep-amqp-helper;a=blobdiff_plain;f=lib%2Fbase.js;fp=lib%2Fbase.js;h=87607daeb6e7c1402feb490c5a0fc5c513ea6f1a;hp=0000000000000000000000000000000000000000;hb=174280d3f44ba13dac0b26d42d968189a4f4fa93;hpb=67905316ada5ee4668306506705f4ee2a5f407f0 diff --git a/lib/base.js b/lib/base.js new file mode 100644 index 0000000..87607da --- /dev/null +++ b/lib/base.js @@ -0,0 +1,300 @@ +'use strict'; +const amqp = require('amqplib'); +const { isFatalError: isFatalAMQPError } = require('amqplib/lib/connection'); +const common = require('./common'); + +const _fileScope = common.fileScope(__filename); + +/** + * Base class common to worker publisher and consumer, handling + * the AMQP setup. + */ + +class Base { + /** + * @param {Console} logger + * @param {Object} options + * @param {String} options.url + * @param {Number=} options.prefetch + * @param {Object=} options.socketOptions + * @param {Boolean=} options.socketOptions.noDelay + * @param {Number=} options.socketOptions.timeout + * @param {Boolean=} options.socketOptions.keepAlive + * @param {Number=} options.socketOptions.keepAliveDelay + * @param {Object=} options.socketOptions.clientProperties + */ + constructor(logger, options) { + this.logger = logger; + this.options = Object.assign({ + url: undefined, + name: 'messages', + prefix: 'squeep', + queueSuffix: 'queue', + retrySuffix: 'retry', + exchangeType: 'direct', + queueType: 'quorum', + retryDelayMs: 10000, + prefetch: 1, + }, options); + this.options.socketOptions = Object.assign({ + noDelay: undefined, + timeout: undefined, + keepAlive: undefined, + keepAliveDelay: undefined, + clientProperties: undefined, + }, options.socketOptions); + + this.connection = undefined; + this.channel = undefined; + } + + + /** + * Establish the necessary connections to the queue and lock services. + */ + async connect() { + const _scope = _fileScope('connect'); + this.logger.debug(_scope, 'called'); + + try { + await this._connectAMQP(); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + await this.close(); + throw e; + } + } + + async _connectAMQP() { + await this._establishAMQPConnection(); + await this._establishAMQPChannel(); + } + + async _establishAMQPConnection() { + const _scope = _fileScope('_establishConnection'); + const { url, socketOptions } = this.options; + try { + this.connection = await amqp.connect(url, socketOptions); + this.connection.on('close', this._eventAMQPConnectionClose.bind(this)); + this.connection.on('error', this._eventAMQPConnectionError.bind(this)); + this.connection.on('blocked', this._eventAMQPConnectionBlocked.bind(this)); + this.connection.on('unblocked', this._eventAMQPConnectionUnblocked.bind(this)); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + throw e; + } + } + + _eventAMQPConnectionClose(err) { + const _scope = _fileScope('_eventConnectionClose'); + this.connection = undefined; + this.channel = undefined; + const isFatal = isFatalAMQPError(err); + const _log = isFatal ? this.logger.error : this.logger.debug; + _log(_scope, 'connection close event', { err }); + } + + _eventAMQPConnectionError(err) { + const _scope = _fileScope('_eventConnectionError'); + this.logger.error(_scope, 'connection error event', { err }); + } + + _eventAMQPConnectionBlocked(reason) { + const _scope = _fileScope('_eventAMQPConnectionBlocked'); + this.logger.debug(_scope, 'connection blocked event', { reason }); + } + + _eventAMQPConnectionUnblocked() { + const _scope = _fileScope('_eventAMQPConnectionUnblocked'); + this.logger.debug(_scope, 'connection unblocked event'); + } + + async _establishAMQPChannel() { + const _scope = _fileScope('_establishChannel'); + try { + this.channel = await this.connection.createConfirmChannel(); + this.channel.on('close', this._eventAMQPChannelClose.bind(this)); + this.channel.on('error', this._eventAMQPChannelError.bind(this)); + this.channel.on('return', this._eventAMQPChannelReturn.bind(this)); + this.channel.on('drain', this._eventAMQPChannelDrain.bind(this)); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + throw e; + } + } + + _eventAMQPChannelClose(err) { + const _scope = _fileScope('_eventChannelClose'); + this.channel = undefined; + const isFatal = err && isFatalAMQPError(err); + const _log = isFatal ? this.logger.error : this.logger.debug; + _log(_scope, 'channel close event', { err }); + } + + _eventAMQPChannelError(err) { + const _scope = _fileScope('_eventChannelError'); + this.logger.error(_scope, 'channel error event', { err }); + } + + _eventAMQPChannelReturn(msg) { + const _scope = _fileScope('_eventChannelReturn'); + this.logger.error(_scope, 'channel return event', { msg }); + } + + _eventAMQPChannelDrain() { + const _scope = _fileScope('_eventChannelDrain'); + this.logger.debug(_scope, 'channel drain event'); + } + + _exchangeName(name) { + return [ + ...(this.options.prefix && [this.options.prefix] || []), + name, + ].join('.'); + } + + _retryExchangeName(name) { + return [ + ...(this.options.prefix && [this.options.prefix] || []), + name, + this.options.retrySuffix, + ].join('.'); + } + + _queueName(name) { + return [ + ...(this.options.prefix && [this.options.prefix] || []), + name, + this.options.queueSuffix, + ].join('.'); + } + + _retryQueueName(name) { + return [ + ...(this.options.prefix && [this.options.prefix] || []), + name, + this.options.retrySuffix, + this.options.queueSuffix, + ].join('.'); + } + + _retryQueuePolicyName(name) { + return [ + ...(this.options.prefix && [this.options.prefix] || []), + ...((name || '').split('.').map(() => '.*')), + this.options.retrySuffix, + this.options.queueSuffix, + ].join('\\.'); + } + + /** + * Note: + * - RabbitMQ does not currently support creating quorum queues with message-ttl. + * - amqplib does not provide an API to set a ttl policy on the retry queue + * @see {@link policyCommand} + * + * @param {String} name + */ + async establishAMQPPlumbing(name) { + const _scope = _fileScope('establishAMQPPlumbing'); + + const exchangeOptions = { + durable: true, // Exchanges are permanent + }; + + const queueOptions = { + arguments: { + 'x-queue-type': this.options.queueType, // quorum for fault-tolerance and data-safety + 'x-dead-letter-exchange': this._retryExchangeName(name), + }, + durable: true, // Queues are permanent + noAck: false, // Queues expect explicit acknowledgement of delivery + }; + + const queueRetryOptions = { + arguments: { + 'x-queue-type': this.options.queueType, + 'x-dead-letter-exchange': this._exchangeName(name), + // 'x-message-ttl': workerRetryDelayMs, // Cannot currently set on quorum queues, define a policy instead. (No api for that??) + }, + durable: true, + noAck: false, + }; + + const exchangeName = this._exchangeName(name); + const retryExchangeName = this._retryExchangeName(name); + const queueName = this._queueName(name); + const retryQueueName = this._retryQueueName(name); + + this.logger.debug(_scope, 'plumbing', { exchangeName, retryExchangeName, queueName, retryQueueName }); + + try { + const [ exchange, retryExchange, queue, retryQueue ] = await Promise.all([ + this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions), + this.channel.assertExchange(retryExchangeName, this.options.exchangeType, exchangeOptions), + this.channel.assertQueue(queueName, queueOptions), + this.channel.assertQueue(retryQueueName, queueRetryOptions), + ]); + const [ queueBinding, retryQueueBinding ] = await Promise.all([ + this.channel.bindQueue(queueName, exchangeName, ''), + this.channel.bindQueue(retryQueueName, retryExchangeName, ''), + ]); + this.logger.debug(_scope, 'plumbing asserted', { exchange, retryExchange, queue, retryQueue, queueBinding, retryQueueBinding }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, name }); + await this.close(); + throw e; + } + } + + /** + * Shutdown the channel and connection. + */ + async close() { + const _scope = _fileScope('close'); + this.logger.debug(_scope, 'called'); + + try { + if (this.channel) { + await this.channel.recover(); + await this.channel.close(); + } + + if (this.connection) { + await this.connection.close(); + } + this.logger.debug(_scope, 'closed'); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + throw e; + } + } + + /** + * Minimal health-check, connection is writable. + * @returns {Boolean} + */ + health() { + return !!this?.connection?.connection?.stream?.writable; + } + + /** + * Generate an example cli command to create a retry-queue policy. + */ + policyCommand(name) { + const settings = { + 'message-ttl': this.options.retryDelayMs, + }; + return [ + 'rabbitmqctl', + 'set_policy', + 'RetryQueueTTL', + `'${this._retryQueuePolicyName(name)}'`, + `'${JSON.stringify(settings)}'`, + '--apply-to queues', + ].join(' '); + } + +} + +module.exports = Base; \ No newline at end of file