initial commit
[squeep-amqp-helper] / lib / consumer.js
diff --git a/lib/consumer.js b/lib/consumer.js
new file mode 100644 (file)
index 0000000..bd25b02
--- /dev/null
@@ -0,0 +1,68 @@
+'use strict';
+const Base = require('./base');
+const common = require('./common');
+
+const _fileScope = common.fileScope(__filename);
+
+class Consumer extends Base {
+  constructor(logger, options) {
+    super(logger, options);
+
+    this.queueConsumerTags = {};
+  }
+
+  _eventAMQPConnectionClose(err) {
+    this.queueConsumerTags = {};
+    super._eventAMQPConnectionClose(err);
+  }
+
+  _eventAMQPChannelClose(err) {
+    this.queueConsumerTags = {};
+    super._eventAMQPChannelClose(err);
+  }
+
+  /**
+   * @param {String} name
+   * @param {(channel, message) => Promise<void>} messageHandler 
+   * @param {Object} options 
+   * @param {Integer=} prefetch 
+   */
+  async consume(name, messageHandler, options, prefetch = this.options.prefetch) {
+    const _scope = _fileScope('consume');
+    this.logger.debug(_scope, 'called', { name, options, prefetch });
+
+    const queue = this._queueName(name);
+    try {
+      const q = await this.channel.checkQueue(queue);
+      this.logger.debug(_scope, 'checkQueue', { queue, ...q });
+      if (!q) {
+        this.logger.error(_scope, 'no queue to consume', { queue, ...q });
+        throw new Error(`queue '${queue}' does not exist`);
+      }
+      this.channel.prefetch(prefetch);
+      const messageConsumer = this._messageConsumerFactory(messageHandler);
+      const consumer = await this.channel.consume(queue, messageConsumer, options);
+      this.queueConsumerTags[queue] = consumer.consumerTag; // eslint-disable-line security/detect-object-injection
+      this.logger.debug(_scope, 'new consumer', { consumer });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, queue });
+      throw e;
+    }
+  }
+
+  _messageConsumerFactory(messageHandler) {
+    const _scope = _fileScope('_messageConsumer');
+    const { channel, logger } = this;
+    return async (message) => {
+      try {
+        return await messageHandler(channel, message);
+      } catch (e) {
+        channel.nack(message, false, false);
+        logger.error(_scope, 'messageHandler failed', { error: e, message });
+      }
+    };
+  }
+
+}
+
+module.exports = Consumer;
\ No newline at end of file