'use strict';
const amqp = require('amqplib');
const { isFatalError: isFatalAMQPError } = require('amqplib/lib/connection');
-const common = require('./common');
+const { fileScope } = require('@squeep/log-helper');
-const _fileScope = common.fileScope(__filename);
+const _fileScope = fileScope(__filename);
/**
* Base class common to worker publisher and consumer, handling
class Base {
/**
- * @param {Console} logger
- * @param {Object} options
- * @param {String} options.url
- * @param {Number=} options.prefetch
- * @param {Object=} options.socketOptions
- * @param {Boolean=} options.socketOptions.noDelay
- * @param {Number=} options.socketOptions.timeout
- * @param {Boolean=} options.socketOptions.keepAlive
- * @param {Number=} options.socketOptions.keepAliveDelay
- * @param {Object=} options.socketOptions.clientProperties
+ * @typedef {object} ConsoleLike
+ * @property {Function} debug log debug
+ * @property {Function} error log error
+ */
+ /**
+ * @param {ConsoleLike} logger logger instance
+ * @param {object} options options
+ * @param {string} options.url connection url
+ * @param {number=} options.prefetch prefetch
+ * @param {object=} options.socketOptions socket options
+ * @param {boolean=} options.socketOptions.noDelay no delay
+ * @param {number=} options.socketOptions.timeout timeout
+ * @param {boolean=} options.socketOptions.keepAlive keep alive
+ * @param {number=} options.socketOptions.keepAliveDelay keep alive delay
+ * @param {object=} options.socketOptions.clientProperties client properties
*/
constructor(logger, options) {
this.logger = logger;
- this.options = Object.assign({
+ this.options = {
url: undefined,
name: 'messages',
prefix: 'squeep',
queueType: 'quorum',
retryDelayMs: 10000,
prefetch: 1,
- }, options);
- this.options.socketOptions = Object.assign({
+ ...options,
+ };
+ this.options.socketOptions = {
noDelay: undefined,
timeout: undefined,
keepAlive: undefined,
keepAliveDelay: undefined,
clientProperties: undefined,
- }, options.socketOptions);
+ ...options.socketOptions,
+ };
this.connection = undefined;
this.channel = undefined;
/**
- * Establish the necessary connections to the queue and lock services.
+ * Establish the necessary connections to the queue.
*/
async connect() {
const _scope = _fileScope('connect');
* Note:
* - RabbitMQ does not currently support creating quorum queues with message-ttl.
* - amqplib does not provide an API to set a ttl policy on the retry queue
- * @see {@link policyCommand}
- *
- * @param {String} name
+ * @see policyCommand
+ * @param {string} name name
*/
async establishAMQPPlumbing(name) {
const _scope = _fileScope('establishAMQPPlumbing');
/**
* Minimal health-check, connection is writable.
- * @returns {Boolean}
+ * @returns {boolean} status
*/
health() {
return !!this?.connection?.connection?.stream?.writable;
/**
* Generate an example cli command to create a retry-queue policy.
+ * @param {string} name name
+ * @returns {string} policy command
*/
policyCommand(name) {
const settings = {