--- /dev/null
+'use strict';
+const Base = require('./base');
+const common = require('./common');
+
+const _fileScope = common.fileScope(__filename);
+
+class Consumer extends Base {
+ constructor(logger, options) {
+ super(logger, options);
+
+ this.queueConsumerTags = {};
+ }
+
+ _eventAMQPConnectionClose(err) {
+ this.queueConsumerTags = {};
+ super._eventAMQPConnectionClose(err);
+ }
+
+ _eventAMQPChannelClose(err) {
+ this.queueConsumerTags = {};
+ super._eventAMQPChannelClose(err);
+ }
+
+ /**
+ * @param {String} name
+ * @param {(channel, message) => Promise<void>} messageHandler
+ * @param {Object} options
+ * @param {Integer=} prefetch
+ */
+ async consume(name, messageHandler, options, prefetch = this.options.prefetch) {
+ const _scope = _fileScope('consume');
+ this.logger.debug(_scope, 'called', { name, options, prefetch });
+
+ const queue = this._queueName(name);
+ try {
+ const q = await this.channel.checkQueue(queue);
+ this.logger.debug(_scope, 'checkQueue', { queue, ...q });
+ if (!q) {
+ this.logger.error(_scope, 'no queue to consume', { queue, ...q });
+ throw new Error(`queue '${queue}' does not exist`);
+ }
+ this.channel.prefetch(prefetch);
+ const messageConsumer = this._messageConsumerFactory(messageHandler);
+ const consumer = await this.channel.consume(queue, messageConsumer, options);
+ this.queueConsumerTags[queue] = consumer.consumerTag; // eslint-disable-line security/detect-object-injection
+ this.logger.debug(_scope, 'new consumer', { consumer });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, queue });
+ throw e;
+ }
+ }
+
+ _messageConsumerFactory(messageHandler) {
+ const _scope = _fileScope('_messageConsumer');
+ const { channel, logger } = this;
+ return async (message) => {
+ try {
+ return await messageHandler(channel, message);
+ } catch (e) {
+ channel.nack(message, false, false);
+ logger.error(_scope, 'messageHandler failed', { error: e, message });
+ }
+ };
+ }
+
+}
+
+module.exports = Consumer;
\ No newline at end of file