X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=lib%2Fbase.js;h=6dc5ecb1b2ecc453d1457270172e39c66daede44;hb=ac3766421358ef97b80a224582a81c5aebac1f46;hp=87607daeb6e7c1402feb490c5a0fc5c513ea6f1a;hpb=174280d3f44ba13dac0b26d42d968189a4f4fa93;p=squeep-amqp-helper diff --git a/lib/base.js b/lib/base.js index 87607da..6dc5ecb 100644 --- a/lib/base.js +++ b/lib/base.js @@ -1,9 +1,9 @@ 'use strict'; const amqp = require('amqplib'); const { isFatalError: isFatalAMQPError } = require('amqplib/lib/connection'); -const common = require('./common'); +const { fileScope } = require('@squeep/log-helper'); -const _fileScope = common.fileScope(__filename); +const _fileScope = fileScope(__filename); /** * Base class common to worker publisher and consumer, handling @@ -12,20 +12,25 @@ const _fileScope = common.fileScope(__filename); class Base { /** - * @param {Console} logger - * @param {Object} options - * @param {String} options.url - * @param {Number=} options.prefetch - * @param {Object=} options.socketOptions - * @param {Boolean=} options.socketOptions.noDelay - * @param {Number=} options.socketOptions.timeout - * @param {Boolean=} options.socketOptions.keepAlive - * @param {Number=} options.socketOptions.keepAliveDelay - * @param {Object=} options.socketOptions.clientProperties + * @typedef {object} ConsoleLike + * @property {Function} debug log debug + * @property {Function} error log error + */ + /** + * @param {ConsoleLike} logger logger instance + * @param {object} options options + * @param {string} options.url connection url + * @param {number=} options.prefetch prefetch + * @param {object=} options.socketOptions socket options + * @param {boolean=} options.socketOptions.noDelay no delay + * @param {number=} options.socketOptions.timeout timeout + * @param {boolean=} options.socketOptions.keepAlive keep alive + * @param {number=} options.socketOptions.keepAliveDelay keep alive delay + * @param {object=} options.socketOptions.clientProperties client properties */ constructor(logger, options) { this.logger = logger; - this.options = Object.assign({ + this.options = { url: undefined, name: 'messages', prefix: 'squeep', @@ -35,14 +40,16 @@ class Base { queueType: 'quorum', retryDelayMs: 10000, prefetch: 1, - }, options); - this.options.socketOptions = Object.assign({ + ...options, + }; + this.options.socketOptions = { noDelay: undefined, timeout: undefined, keepAlive: undefined, keepAliveDelay: undefined, clientProperties: undefined, - }, options.socketOptions); + ...options.socketOptions, + }; this.connection = undefined; this.channel = undefined; @@ -50,7 +57,7 @@ class Base { /** - * Establish the necessary connections to the queue and lock services. + * Establish the necessary connections to the queue. */ async connect() { const _scope = _fileScope('connect'); @@ -191,9 +198,8 @@ class Base { * Note: * - RabbitMQ does not currently support creating quorum queues with message-ttl. * - amqplib does not provide an API to set a ttl policy on the retry queue - * @see {@link policyCommand} - * - * @param {String} name + * @see policyCommand + * @param {string} name name */ async establishAMQPPlumbing(name) { const _scope = _fileScope('establishAMQPPlumbing'); @@ -272,7 +278,7 @@ class Base { /** * Minimal health-check, connection is writable. - * @returns {Boolean} + * @returns {boolean} status */ health() { return !!this?.connection?.connection?.stream?.writable; @@ -280,6 +286,8 @@ class Base { /** * Generate an example cli command to create a retry-queue policy. + * @param {string} name name + * @returns {string} policy command */ policyCommand(name) { const settings = {