2 const amqp
= require('amqplib');
3 const { isFatalError: isFatalAMQPError
} = require('amqplib/lib/connection');
4 const { fileScope
} = require('@squeep/log-helper');
6 const _fileScope
= fileScope(__filename
);
9 * Base class common to worker publisher and consumer, handling
15 * @typedef {object} ConsoleLike
16 * @property {Function} debug log debug
17 * @property {Function} error log error
20 * @param {ConsoleLike} logger logger instance
21 * @param {object} options options
22 * @param {string} options.url connection url
23 * @param {number=} options.prefetch prefetch
24 * @param {object=} options.socketOptions socket options
25 * @param {boolean=} options.socketOptions.noDelay no delay
26 * @param {number=} options.socketOptions.timeout timeout
27 * @param {boolean=} options.socketOptions.keepAlive keep alive
28 * @param {number=} options.socketOptions.keepAliveDelay keep alive delay
29 * @param {object=} options.socketOptions.clientProperties client properties
31 constructor(logger
, options
) {
39 exchangeType: 'direct',
45 this.options
.socketOptions
= {
49 keepAliveDelay: undefined,
50 clientProperties: undefined,
51 ...options
.socketOptions
,
54 this.connection
= undefined;
55 this.channel
= undefined;
60 * Establish the necessary connections to the queue.
63 const _scope
= _fileScope('connect');
64 this.logger
.debug(_scope
, 'called');
67 await
this._connectAMQP();
69 this.logger
.error(_scope
, 'failed', { error: e
});
75 async
_connectAMQP() {
76 await
this._establishAMQPConnection();
77 await
this._establishAMQPChannel();
80 async
_establishAMQPConnection() {
81 const _scope
= _fileScope('_establishConnection');
82 const { url
, socketOptions
} = this.options
;
84 this.connection
= await amqp
.connect(url
, socketOptions
);
85 this.connection
.on('close', this._eventAMQPConnectionClose
.bind(this));
86 this.connection
.on('error', this._eventAMQPConnectionError
.bind(this));
87 this.connection
.on('blocked', this._eventAMQPConnectionBlocked
.bind(this));
88 this.connection
.on('unblocked', this._eventAMQPConnectionUnblocked
.bind(this));
90 this.logger
.error(_scope
, 'failed', { error: e
});
95 _eventAMQPConnectionClose(err
) {
96 const _scope
= _fileScope('_eventConnectionClose');
97 this.connection
= undefined;
98 this.channel
= undefined;
99 const isFatal
= isFatalAMQPError(err
);
100 const _log
= isFatal
? this.logger
.error : this.logger
.debug
;
101 _log(_scope
, 'connection close event', { err
});
104 _eventAMQPConnectionError(err
) {
105 const _scope
= _fileScope('_eventConnectionError');
106 this.logger
.error(_scope
, 'connection error event', { err
});
109 _eventAMQPConnectionBlocked(reason
) {
110 const _scope
= _fileScope('_eventAMQPConnectionBlocked');
111 this.logger
.debug(_scope
, 'connection blocked event', { reason
});
114 _eventAMQPConnectionUnblocked() {
115 const _scope
= _fileScope('_eventAMQPConnectionUnblocked');
116 this.logger
.debug(_scope
, 'connection unblocked event');
119 async
_establishAMQPChannel() {
120 const _scope
= _fileScope('_establishChannel');
122 this.channel
= await
this.connection
.createConfirmChannel();
123 this.channel
.on('close', this._eventAMQPChannelClose
.bind(this));
124 this.channel
.on('error', this._eventAMQPChannelError
.bind(this));
125 this.channel
.on('return', this._eventAMQPChannelReturn
.bind(this));
126 this.channel
.on('drain', this._eventAMQPChannelDrain
.bind(this));
128 this.logger
.error(_scope
, 'failed', { error: e
});
133 _eventAMQPChannelClose(err
) {
134 const _scope
= _fileScope('_eventChannelClose');
135 this.channel
= undefined;
136 const isFatal
= err
&& isFatalAMQPError(err
);
137 const _log
= isFatal
? this.logger
.error : this.logger
.debug
;
138 _log(_scope
, 'channel close event', { err
});
141 _eventAMQPChannelError(err
) {
142 const _scope
= _fileScope('_eventChannelError');
143 this.logger
.error(_scope
, 'channel error event', { err
});
146 _eventAMQPChannelReturn(msg
) {
147 const _scope
= _fileScope('_eventChannelReturn');
148 this.logger
.error(_scope
, 'channel return event', { msg
});
151 _eventAMQPChannelDrain() {
152 const _scope
= _fileScope('_eventChannelDrain');
153 this.logger
.debug(_scope
, 'channel drain event');
156 _exchangeName(name
) {
158 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
163 _retryExchangeName(name
) {
165 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
167 this.options
.retrySuffix
,
173 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
175 this.options
.queueSuffix
,
179 _retryQueueName(name
) {
181 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
183 this.options
.retrySuffix
,
184 this.options
.queueSuffix
,
188 _retryQueuePolicyName(name
) {
190 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
191 ...((name
|| '').split('.').map(() => '.*')),
192 this.options
.retrySuffix
,
193 this.options
.queueSuffix
,
199 * - RabbitMQ does not currently support creating quorum queues with message-ttl.
200 * - amqplib does not provide an API to set a ttl policy on the retry queue
202 * @param {string} name name
204 async
establishAMQPPlumbing(name
) {
205 const _scope
= _fileScope('establishAMQPPlumbing');
207 const exchangeOptions
= {
208 durable: true, // Exchanges are permanent
211 const queueOptions
= {
213 'x-queue-type': this.options
.queueType
, // quorum for fault-tolerance and data-safety
214 'x-dead-letter-exchange': this._retryExchangeName(name
),
216 durable: true, // Queues are permanent
217 noAck: false, // Queues expect explicit acknowledgement of delivery
220 const queueRetryOptions
= {
222 'x-queue-type': this.options
.queueType
,
223 'x-dead-letter-exchange': this._exchangeName(name
),
224 // 'x-message-ttl': workerRetryDelayMs, // Cannot currently set on quorum queues, define a policy instead. (No api for that??)
230 const exchangeName
= this._exchangeName(name
);
231 const retryExchangeName
= this._retryExchangeName(name
);
232 const queueName
= this._queueName(name
);
233 const retryQueueName
= this._retryQueueName(name
);
235 this.logger
.debug(_scope
, 'plumbing', { exchangeName
, retryExchangeName
, queueName
, retryQueueName
});
238 const [ exchange
, retryExchange
, queue
, retryQueue
] = await Promise
.all([
239 this.channel
.assertExchange(exchangeName
, this.options
.exchangeType
, exchangeOptions
),
240 this.channel
.assertExchange(retryExchangeName
, this.options
.exchangeType
, exchangeOptions
),
241 this.channel
.assertQueue(queueName
, queueOptions
),
242 this.channel
.assertQueue(retryQueueName
, queueRetryOptions
),
244 const [ queueBinding
, retryQueueBinding
] = await Promise
.all([
245 this.channel
.bindQueue(queueName
, exchangeName
, ''),
246 this.channel
.bindQueue(retryQueueName
, retryExchangeName
, ''),
248 this.logger
.debug(_scope
, 'plumbing asserted', { exchange
, retryExchange
, queue
, retryQueue
, queueBinding
, retryQueueBinding
});
250 this.logger
.error(_scope
, 'failed', { error: e
, name
});
257 * Shutdown the channel and connection.
260 const _scope
= _fileScope('close');
261 this.logger
.debug(_scope
, 'called');
265 await
this.channel
.recover();
266 await
this.channel
.close();
269 if (this.connection
) {
270 await
this.connection
.close();
272 this.logger
.debug(_scope
, 'closed');
274 this.logger
.error(_scope
, 'failed', { error: e
});
280 * Minimal health-check, connection is writable.
281 * @returns {boolean} status
284 return !!this?.connection
?.connection
?.stream
?.writable
;
288 * Generate an example cli command to create a retry-queue policy.
289 * @param {string} name name
290 * @returns {string} policy command
292 policyCommand(name
) {
294 'message-ttl': this.options
.retryDelayMs
,
300 `'${this._retryQueuePolicyName(name)}'`,
301 `'${JSON.stringify(settings)}'`,
308 module
.exports
= Base
;