initial commit
[squeep-amqp-helper] / lib / consumer.js
1 'use strict';
2 const Base = require('./base');
3 const common = require('./common');
4
5 const _fileScope = common.fileScope(__filename);
6
7 class Consumer extends Base {
8 constructor(logger, options) {
9 super(logger, options);
10
11 this.queueConsumerTags = {};
12 }
13
14 _eventAMQPConnectionClose(err) {
15 this.queueConsumerTags = {};
16 super._eventAMQPConnectionClose(err);
17 }
18
19 _eventAMQPChannelClose(err) {
20 this.queueConsumerTags = {};
21 super._eventAMQPChannelClose(err);
22 }
23
24 /**
25 * @param {String} name
26 * @param {(channel, message) => Promise<void>} messageHandler
27 * @param {Object} options
28 * @param {Integer=} prefetch
29 */
30 async consume(name, messageHandler, options, prefetch = this.options.prefetch) {
31 const _scope = _fileScope('consume');
32 this.logger.debug(_scope, 'called', { name, options, prefetch });
33
34 const queue = this._queueName(name);
35 try {
36 const q = await this.channel.checkQueue(queue);
37 this.logger.debug(_scope, 'checkQueue', { queue, ...q });
38 if (!q) {
39 this.logger.error(_scope, 'no queue to consume', { queue, ...q });
40 throw new Error(`queue '${queue}' does not exist`);
41 }
42 this.channel.prefetch(prefetch);
43 const messageConsumer = this._messageConsumerFactory(messageHandler);
44 const consumer = await this.channel.consume(queue, messageConsumer, options);
45 this.queueConsumerTags[queue] = consumer.consumerTag; // eslint-disable-line security/detect-object-injection
46 this.logger.debug(_scope, 'new consumer', { consumer });
47 } catch (e) {
48 this.logger.error(_scope, 'failed', { error: e, queue });
49 throw e;
50 }
51 }
52
53 _messageConsumerFactory(messageHandler) {
54 const _scope = _fileScope('_messageConsumer');
55 const { channel, logger } = this;
56 return async (message) => {
57 try {
58 return await messageHandler(channel, message);
59 } catch (e) {
60 channel.nack(message, false, false);
61 logger.error(_scope, 'messageHandler failed', { error: e, message });
62 }
63 };
64 }
65
66 }
67
68 module.exports = Consumer;