2 const amqp
= require('amqplib');
3 const { isFatalError: isFatalAMQPError
} = require('amqplib/lib/connection');
4 const common
= require('./common');
6 const _fileScope
= common
.fileScope(__filename
);
9 * Base class common to worker publisher and consumer, handling
15 * @param {Console} logger
16 * @param {Object} options
17 * @param {String} options.url
18 * @param {Number=} options.prefetch
19 * @param {Object=} options.socketOptions
20 * @param {Boolean=} options.socketOptions.noDelay
21 * @param {Number=} options.socketOptions.timeout
22 * @param {Boolean=} options.socketOptions.keepAlive
23 * @param {Number=} options.socketOptions.keepAliveDelay
24 * @param {Object=} options.socketOptions.clientProperties
26 constructor(logger
, options
) {
28 this.options
= Object
.assign({
34 exchangeType: 'direct',
39 this.options
.socketOptions
= Object
.assign({
43 keepAliveDelay: undefined,
44 clientProperties: undefined,
45 }, options
.socketOptions
);
47 this.connection
= undefined;
48 this.channel
= undefined;
53 * Establish the necessary connections to the queue and lock services.
56 const _scope
= _fileScope('connect');
57 this.logger
.debug(_scope
, 'called');
60 await
this._connectAMQP();
62 this.logger
.error(_scope
, 'failed', { error: e
});
68 async
_connectAMQP() {
69 await
this._establishAMQPConnection();
70 await
this._establishAMQPChannel();
73 async
_establishAMQPConnection() {
74 const _scope
= _fileScope('_establishConnection');
75 const { url
, socketOptions
} = this.options
;
77 this.connection
= await amqp
.connect(url
, socketOptions
);
78 this.connection
.on('close', this._eventAMQPConnectionClose
.bind(this));
79 this.connection
.on('error', this._eventAMQPConnectionError
.bind(this));
80 this.connection
.on('blocked', this._eventAMQPConnectionBlocked
.bind(this));
81 this.connection
.on('unblocked', this._eventAMQPConnectionUnblocked
.bind(this));
83 this.logger
.error(_scope
, 'failed', { error: e
});
88 _eventAMQPConnectionClose(err
) {
89 const _scope
= _fileScope('_eventConnectionClose');
90 this.connection
= undefined;
91 this.channel
= undefined;
92 const isFatal
= isFatalAMQPError(err
);
93 const _log
= isFatal
? this.logger
.error : this.logger
.debug
;
94 _log(_scope
, 'connection close event', { err
});
97 _eventAMQPConnectionError(err
) {
98 const _scope
= _fileScope('_eventConnectionError');
99 this.logger
.error(_scope
, 'connection error event', { err
});
102 _eventAMQPConnectionBlocked(reason
) {
103 const _scope
= _fileScope('_eventAMQPConnectionBlocked');
104 this.logger
.debug(_scope
, 'connection blocked event', { reason
});
107 _eventAMQPConnectionUnblocked() {
108 const _scope
= _fileScope('_eventAMQPConnectionUnblocked');
109 this.logger
.debug(_scope
, 'connection unblocked event');
112 async
_establishAMQPChannel() {
113 const _scope
= _fileScope('_establishChannel');
115 this.channel
= await
this.connection
.createConfirmChannel();
116 this.channel
.on('close', this._eventAMQPChannelClose
.bind(this));
117 this.channel
.on('error', this._eventAMQPChannelError
.bind(this));
118 this.channel
.on('return', this._eventAMQPChannelReturn
.bind(this));
119 this.channel
.on('drain', this._eventAMQPChannelDrain
.bind(this));
121 this.logger
.error(_scope
, 'failed', { error: e
});
126 _eventAMQPChannelClose(err
) {
127 const _scope
= _fileScope('_eventChannelClose');
128 this.channel
= undefined;
129 const isFatal
= err
&& isFatalAMQPError(err
);
130 const _log
= isFatal
? this.logger
.error : this.logger
.debug
;
131 _log(_scope
, 'channel close event', { err
});
134 _eventAMQPChannelError(err
) {
135 const _scope
= _fileScope('_eventChannelError');
136 this.logger
.error(_scope
, 'channel error event', { err
});
139 _eventAMQPChannelReturn(msg
) {
140 const _scope
= _fileScope('_eventChannelReturn');
141 this.logger
.error(_scope
, 'channel return event', { msg
});
144 _eventAMQPChannelDrain() {
145 const _scope
= _fileScope('_eventChannelDrain');
146 this.logger
.debug(_scope
, 'channel drain event');
149 _exchangeName(name
) {
151 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
156 _retryExchangeName(name
) {
158 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
160 this.options
.retrySuffix
,
166 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
168 this.options
.queueSuffix
,
172 _retryQueueName(name
) {
174 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
176 this.options
.retrySuffix
,
177 this.options
.queueSuffix
,
181 _retryQueuePolicyName(name
) {
183 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
184 ...((name
|| '').split('.').map(() => '.*')),
185 this.options
.retrySuffix
,
186 this.options
.queueSuffix
,
192 * - RabbitMQ does not currently support creating quorum queues with message-ttl.
193 * - amqplib does not provide an API to set a ttl policy on the retry queue
194 * @see {@link policyCommand}
196 * @param {String} name
198 async
establishAMQPPlumbing(name
) {
199 const _scope
= _fileScope('establishAMQPPlumbing');
201 const exchangeOptions
= {
202 durable: true, // Exchanges are permanent
205 const queueOptions
= {
207 'x-queue-type': this.options
.queueType
, // quorum for fault-tolerance and data-safety
208 'x-dead-letter-exchange': this._retryExchangeName(name
),
210 durable: true, // Queues are permanent
211 noAck: false, // Queues expect explicit acknowledgement of delivery
214 const queueRetryOptions
= {
216 'x-queue-type': this.options
.queueType
,
217 'x-dead-letter-exchange': this._exchangeName(name
),
218 // 'x-message-ttl': workerRetryDelayMs, // Cannot currently set on quorum queues, define a policy instead. (No api for that??)
224 const exchangeName
= this._exchangeName(name
);
225 const retryExchangeName
= this._retryExchangeName(name
);
226 const queueName
= this._queueName(name
);
227 const retryQueueName
= this._retryQueueName(name
);
229 this.logger
.debug(_scope
, 'plumbing', { exchangeName
, retryExchangeName
, queueName
, retryQueueName
});
232 const [ exchange
, retryExchange
, queue
, retryQueue
] = await Promise
.all([
233 this.channel
.assertExchange(exchangeName
, this.options
.exchangeType
, exchangeOptions
),
234 this.channel
.assertExchange(retryExchangeName
, this.options
.exchangeType
, exchangeOptions
),
235 this.channel
.assertQueue(queueName
, queueOptions
),
236 this.channel
.assertQueue(retryQueueName
, queueRetryOptions
),
238 const [ queueBinding
, retryQueueBinding
] = await Promise
.all([
239 this.channel
.bindQueue(queueName
, exchangeName
, ''),
240 this.channel
.bindQueue(retryQueueName
, retryExchangeName
, ''),
242 this.logger
.debug(_scope
, 'plumbing asserted', { exchange
, retryExchange
, queue
, retryQueue
, queueBinding
, retryQueueBinding
});
244 this.logger
.error(_scope
, 'failed', { error: e
, name
});
251 * Shutdown the channel and connection.
254 const _scope
= _fileScope('close');
255 this.logger
.debug(_scope
, 'called');
259 await
this.channel
.recover();
260 await
this.channel
.close();
263 if (this.connection
) {
264 await
this.connection
.close();
266 this.logger
.debug(_scope
, 'closed');
268 this.logger
.error(_scope
, 'failed', { error: e
});
274 * Minimal health-check, connection is writable.
278 return !!this?.connection
?.connection
?.stream
?.writable
;
282 * Generate an example cli command to create a retry-queue policy.
284 policyCommand(name
) {
286 'message-ttl': this.options
.retryDelayMs
,
292 `'${this._retryQueuePolicyName(name)}'`,
293 `'${JSON.stringify(settings)}'`,
300 module
.exports
= Base
;