2 const amqp
= require('amqplib');
3 const { isFatalError: isFatalAMQPError
} = require('amqplib/lib/connection');
4 const { fileScope
} = require('@squeep/log-helper');
6 const _fileScope
= fileScope(__filename
);
9 * Base class common to worker publisher and consumer, handling
15 * @param {Console} logger
16 * @param {Object} options
17 * @param {String} options.url
18 * @param {Number=} options.prefetch
19 * @param {Object=} options.socketOptions
20 * @param {Boolean=} options.socketOptions.noDelay
21 * @param {Number=} options.socketOptions.timeout
22 * @param {Boolean=} options.socketOptions.keepAlive
23 * @param {Number=} options.socketOptions.keepAliveDelay
24 * @param {Object=} options.socketOptions.clientProperties
26 constructor(logger
, options
) {
34 exchangeType: 'direct',
40 this.options
.socketOptions
= {
44 keepAliveDelay: undefined,
45 clientProperties: undefined,
46 ...options
.socketOptions
,
49 this.connection
= undefined;
50 this.channel
= undefined;
55 * Establish the necessary connections to the queue.
58 const _scope
= _fileScope('connect');
59 this.logger
.debug(_scope
, 'called');
62 await
this._connectAMQP();
64 this.logger
.error(_scope
, 'failed', { error: e
});
70 async
_connectAMQP() {
71 await
this._establishAMQPConnection();
72 await
this._establishAMQPChannel();
75 async
_establishAMQPConnection() {
76 const _scope
= _fileScope('_establishConnection');
77 const { url
, socketOptions
} = this.options
;
79 this.connection
= await amqp
.connect(url
, socketOptions
);
80 this.connection
.on('close', this._eventAMQPConnectionClose
.bind(this));
81 this.connection
.on('error', this._eventAMQPConnectionError
.bind(this));
82 this.connection
.on('blocked', this._eventAMQPConnectionBlocked
.bind(this));
83 this.connection
.on('unblocked', this._eventAMQPConnectionUnblocked
.bind(this));
85 this.logger
.error(_scope
, 'failed', { error: e
});
90 _eventAMQPConnectionClose(err
) {
91 const _scope
= _fileScope('_eventConnectionClose');
92 this.connection
= undefined;
93 this.channel
= undefined;
94 const isFatal
= isFatalAMQPError(err
);
95 const _log
= isFatal
? this.logger
.error : this.logger
.debug
;
96 _log(_scope
, 'connection close event', { err
});
99 _eventAMQPConnectionError(err
) {
100 const _scope
= _fileScope('_eventConnectionError');
101 this.logger
.error(_scope
, 'connection error event', { err
});
104 _eventAMQPConnectionBlocked(reason
) {
105 const _scope
= _fileScope('_eventAMQPConnectionBlocked');
106 this.logger
.debug(_scope
, 'connection blocked event', { reason
});
109 _eventAMQPConnectionUnblocked() {
110 const _scope
= _fileScope('_eventAMQPConnectionUnblocked');
111 this.logger
.debug(_scope
, 'connection unblocked event');
114 async
_establishAMQPChannel() {
115 const _scope
= _fileScope('_establishChannel');
117 this.channel
= await
this.connection
.createConfirmChannel();
118 this.channel
.on('close', this._eventAMQPChannelClose
.bind(this));
119 this.channel
.on('error', this._eventAMQPChannelError
.bind(this));
120 this.channel
.on('return', this._eventAMQPChannelReturn
.bind(this));
121 this.channel
.on('drain', this._eventAMQPChannelDrain
.bind(this));
123 this.logger
.error(_scope
, 'failed', { error: e
});
128 _eventAMQPChannelClose(err
) {
129 const _scope
= _fileScope('_eventChannelClose');
130 this.channel
= undefined;
131 const isFatal
= err
&& isFatalAMQPError(err
);
132 const _log
= isFatal
? this.logger
.error : this.logger
.debug
;
133 _log(_scope
, 'channel close event', { err
});
136 _eventAMQPChannelError(err
) {
137 const _scope
= _fileScope('_eventChannelError');
138 this.logger
.error(_scope
, 'channel error event', { err
});
141 _eventAMQPChannelReturn(msg
) {
142 const _scope
= _fileScope('_eventChannelReturn');
143 this.logger
.error(_scope
, 'channel return event', { msg
});
146 _eventAMQPChannelDrain() {
147 const _scope
= _fileScope('_eventChannelDrain');
148 this.logger
.debug(_scope
, 'channel drain event');
151 _exchangeName(name
) {
153 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
158 _retryExchangeName(name
) {
160 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
162 this.options
.retrySuffix
,
168 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
170 this.options
.queueSuffix
,
174 _retryQueueName(name
) {
176 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
178 this.options
.retrySuffix
,
179 this.options
.queueSuffix
,
183 _retryQueuePolicyName(name
) {
185 ...(this.options
.prefix
&& [this.options
.prefix
] || []),
186 ...((name
|| '').split('.').map(() => '.*')),
187 this.options
.retrySuffix
,
188 this.options
.queueSuffix
,
194 * - RabbitMQ does not currently support creating quorum queues with message-ttl.
195 * - amqplib does not provide an API to set a ttl policy on the retry queue
196 * @see {@link policyCommand}
198 * @param {String} name
200 async
establishAMQPPlumbing(name
) {
201 const _scope
= _fileScope('establishAMQPPlumbing');
203 const exchangeOptions
= {
204 durable: true, // Exchanges are permanent
207 const queueOptions
= {
209 'x-queue-type': this.options
.queueType
, // quorum for fault-tolerance and data-safety
210 'x-dead-letter-exchange': this._retryExchangeName(name
),
212 durable: true, // Queues are permanent
213 noAck: false, // Queues expect explicit acknowledgement of delivery
216 const queueRetryOptions
= {
218 'x-queue-type': this.options
.queueType
,
219 'x-dead-letter-exchange': this._exchangeName(name
),
220 // 'x-message-ttl': workerRetryDelayMs, // Cannot currently set on quorum queues, define a policy instead. (No api for that??)
226 const exchangeName
= this._exchangeName(name
);
227 const retryExchangeName
= this._retryExchangeName(name
);
228 const queueName
= this._queueName(name
);
229 const retryQueueName
= this._retryQueueName(name
);
231 this.logger
.debug(_scope
, 'plumbing', { exchangeName
, retryExchangeName
, queueName
, retryQueueName
});
234 const [ exchange
, retryExchange
, queue
, retryQueue
] = await Promise
.all([
235 this.channel
.assertExchange(exchangeName
, this.options
.exchangeType
, exchangeOptions
),
236 this.channel
.assertExchange(retryExchangeName
, this.options
.exchangeType
, exchangeOptions
),
237 this.channel
.assertQueue(queueName
, queueOptions
),
238 this.channel
.assertQueue(retryQueueName
, queueRetryOptions
),
240 const [ queueBinding
, retryQueueBinding
] = await Promise
.all([
241 this.channel
.bindQueue(queueName
, exchangeName
, ''),
242 this.channel
.bindQueue(retryQueueName
, retryExchangeName
, ''),
244 this.logger
.debug(_scope
, 'plumbing asserted', { exchange
, retryExchange
, queue
, retryQueue
, queueBinding
, retryQueueBinding
});
246 this.logger
.error(_scope
, 'failed', { error: e
, name
});
253 * Shutdown the channel and connection.
256 const _scope
= _fileScope('close');
257 this.logger
.debug(_scope
, 'called');
261 await
this.channel
.recover();
262 await
this.channel
.close();
265 if (this.connection
) {
266 await
this.connection
.close();
268 this.logger
.debug(_scope
, 'closed');
270 this.logger
.error(_scope
, 'failed', { error: e
});
276 * Minimal health-check, connection is writable.
280 return !!this?.connection
?.connection
?.stream
?.writable
;
284 * Generate an example cli command to create a retry-queue policy.
286 policyCommand(name
) {
288 'message-ttl': this.options
.retryDelayMs
,
294 `'${this._retryQueuePolicyName(name)}'`,
295 `'${JSON.stringify(settings)}'`,
302 module
.exports
= Base
;