X-Git-Url: http://git.squeep.com/?p=squeep-amqp-helper;a=blobdiff_plain;f=lib%2Fconsumer.js;fp=lib%2Fconsumer.js;h=bd25b02ea6097a222b36c6bc63282c42e8e5957e;hp=0000000000000000000000000000000000000000;hb=174280d3f44ba13dac0b26d42d968189a4f4fa93;hpb=67905316ada5ee4668306506705f4ee2a5f407f0 diff --git a/lib/consumer.js b/lib/consumer.js new file mode 100644 index 0000000..bd25b02 --- /dev/null +++ b/lib/consumer.js @@ -0,0 +1,68 @@ +'use strict'; +const Base = require('./base'); +const common = require('./common'); + +const _fileScope = common.fileScope(__filename); + +class Consumer extends Base { + constructor(logger, options) { + super(logger, options); + + this.queueConsumerTags = {}; + } + + _eventAMQPConnectionClose(err) { + this.queueConsumerTags = {}; + super._eventAMQPConnectionClose(err); + } + + _eventAMQPChannelClose(err) { + this.queueConsumerTags = {}; + super._eventAMQPChannelClose(err); + } + + /** + * @param {String} name + * @param {(channel, message) => Promise} messageHandler + * @param {Object} options + * @param {Integer=} prefetch + */ + async consume(name, messageHandler, options, prefetch = this.options.prefetch) { + const _scope = _fileScope('consume'); + this.logger.debug(_scope, 'called', { name, options, prefetch }); + + const queue = this._queueName(name); + try { + const q = await this.channel.checkQueue(queue); + this.logger.debug(_scope, 'checkQueue', { queue, ...q }); + if (!q) { + this.logger.error(_scope, 'no queue to consume', { queue, ...q }); + throw new Error(`queue '${queue}' does not exist`); + } + this.channel.prefetch(prefetch); + const messageConsumer = this._messageConsumerFactory(messageHandler); + const consumer = await this.channel.consume(queue, messageConsumer, options); + this.queueConsumerTags[queue] = consumer.consumerTag; // eslint-disable-line security/detect-object-injection + this.logger.debug(_scope, 'new consumer', { consumer }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, queue }); + throw e; + } + } + + _messageConsumerFactory(messageHandler) { + const _scope = _fileScope('_messageConsumer'); + const { channel, logger } = this; + return async (message) => { + try { + return await messageHandler(channel, message); + } catch (e) { + channel.nack(message, false, false); + logger.error(_scope, 'messageHandler failed', { error: e, message }); + } + }; + } + +} + +module.exports = Consumer; \ No newline at end of file