2 const Base
= require('./base');
3 const { fileScope
} = require('@squeep/log-helper');
5 const _fileScope
= fileScope(__filename
);
7 class Consumer
extends Base
{
8 constructor(logger
, options
) {
9 super(logger
, options
);
11 this.queueConsumerTags
= {};
14 _eventAMQPConnectionClose(err
) {
15 this.queueConsumerTags
= {};
16 super._eventAMQPConnectionClose(err
);
19 _eventAMQPChannelClose(err
) {
20 this.queueConsumerTags
= {};
21 super._eventAMQPChannelClose(err
);
25 * @param {string} name name
26 * @param {(channel: string, message: any) => Promise<void>} messageHandler message handler
27 * @param {object} options options
28 * @param {number=} prefetch prefetch
30 async
consume(name
, messageHandler
, options
, prefetch
= this.options
.prefetch
) {
31 const _scope
= _fileScope('consume');
32 this.logger
.debug(_scope
, 'called', { name
, options
, prefetch
});
34 const queue
= this._queueName(name
);
36 const q
= await
this.channel
.checkQueue(queue
);
37 this.logger
.debug(_scope
, 'checkQueue', { queue
, ...q
});
39 this.logger
.error(_scope
, 'no queue to consume', { queue
, ...q
});
40 throw new Error(`queue '${queue}' does not exist`);
42 this.channel
.prefetch(prefetch
);
43 const messageConsumer
= this._messageConsumerFactory(messageHandler
);
44 const consumer
= await
this.channel
.consume(queue
, messageConsumer
, options
);
45 this.queueConsumerTags
[queue
] = consumer
.consumerTag
; // eslint-disable-line security/detect-object-injection
46 this.logger
.debug(_scope
, 'new consumer', { consumer
});
48 this.logger
.error(_scope
, 'failed', { error: e
, queue
});
53 _messageConsumerFactory(messageHandler
) {
54 const _scope
= _fileScope('_messageConsumer');
55 const { channel
, logger
} = this;
56 return async (message
) => {
58 return await
messageHandler(channel
, message
);
60 channel
.nack(message
, false, false);
61 logger
.error(_scope
, 'messageHandler failed', { error: e
, message
});
68 module
.exports
= Consumer
;