bump package version to 1.0.1
[squeep-amqp-helper] / lib / base.js
1 'use strict';
2 const amqp = require('amqplib');
3 const { isFatalError: isFatalAMQPError } = require('amqplib/lib/connection');
4 const { fileScope } = require('@squeep/log-helper');
5
6 const _fileScope = fileScope(__filename);
7
8 /**
9 * Base class common to worker publisher and consumer, handling
10 * the AMQP setup.
11 */
12
13 class Base {
14 /**
15 * @typedef {object} ConsoleLike
16 * @property {Function} debug log debug
17 * @property {Function} error log error
18 */
19 /**
20 * @param {ConsoleLike} logger logger instance
21 * @param {object} options options
22 * @param {string} options.url connection url
23 * @param {number=} options.prefetch prefetch
24 * @param {object=} options.socketOptions socket options
25 * @param {boolean=} options.socketOptions.noDelay no delay
26 * @param {number=} options.socketOptions.timeout timeout
27 * @param {boolean=} options.socketOptions.keepAlive keep alive
28 * @param {number=} options.socketOptions.keepAliveDelay keep alive delay
29 * @param {object=} options.socketOptions.clientProperties client properties
30 */
31 constructor(logger, options) {
32 this.logger = logger;
33 this.options = {
34 url: undefined,
35 name: 'messages',
36 prefix: 'squeep',
37 queueSuffix: 'queue',
38 retrySuffix: 'retry',
39 exchangeType: 'direct',
40 queueType: 'quorum',
41 retryDelayMs: 10000,
42 prefetch: 1,
43 ...options,
44 };
45 this.options.socketOptions = {
46 noDelay: undefined,
47 timeout: undefined,
48 keepAlive: undefined,
49 keepAliveDelay: undefined,
50 clientProperties: undefined,
51 ...options.socketOptions,
52 };
53
54 this.connection = undefined;
55 this.channel = undefined;
56 }
57
58
59 /**
60 * Establish the necessary connections to the queue.
61 */
62 async connect() {
63 const _scope = _fileScope('connect');
64 this.logger.debug(_scope, 'called');
65
66 try {
67 await this._connectAMQP();
68 } catch (e) {
69 this.logger.error(_scope, 'failed', { error: e });
70 await this.close();
71 throw e;
72 }
73 }
74
75 async _connectAMQP() {
76 await this._establishAMQPConnection();
77 await this._establishAMQPChannel();
78 }
79
80 async _establishAMQPConnection() {
81 const _scope = _fileScope('_establishConnection');
82 const { url, socketOptions } = this.options;
83 try {
84 this.connection = await amqp.connect(url, socketOptions);
85 this.connection.on('close', this._eventAMQPConnectionClose.bind(this));
86 this.connection.on('error', this._eventAMQPConnectionError.bind(this));
87 this.connection.on('blocked', this._eventAMQPConnectionBlocked.bind(this));
88 this.connection.on('unblocked', this._eventAMQPConnectionUnblocked.bind(this));
89 } catch (e) {
90 this.logger.error(_scope, 'failed', { error: e });
91 throw e;
92 }
93 }
94
95 _eventAMQPConnectionClose(err) {
96 const _scope = _fileScope('_eventConnectionClose');
97 this.connection = undefined;
98 this.channel = undefined;
99 const isFatal = isFatalAMQPError(err);
100 const _log = isFatal ? this.logger.error : this.logger.debug;
101 _log(_scope, 'connection close event', { err });
102 }
103
104 _eventAMQPConnectionError(err) {
105 const _scope = _fileScope('_eventConnectionError');
106 this.logger.error(_scope, 'connection error event', { err });
107 }
108
109 _eventAMQPConnectionBlocked(reason) {
110 const _scope = _fileScope('_eventAMQPConnectionBlocked');
111 this.logger.debug(_scope, 'connection blocked event', { reason });
112 }
113
114 _eventAMQPConnectionUnblocked() {
115 const _scope = _fileScope('_eventAMQPConnectionUnblocked');
116 this.logger.debug(_scope, 'connection unblocked event');
117 }
118
119 async _establishAMQPChannel() {
120 const _scope = _fileScope('_establishChannel');
121 try {
122 this.channel = await this.connection.createConfirmChannel();
123 this.channel.on('close', this._eventAMQPChannelClose.bind(this));
124 this.channel.on('error', this._eventAMQPChannelError.bind(this));
125 this.channel.on('return', this._eventAMQPChannelReturn.bind(this));
126 this.channel.on('drain', this._eventAMQPChannelDrain.bind(this));
127 } catch (e) {
128 this.logger.error(_scope, 'failed', { error: e });
129 throw e;
130 }
131 }
132
133 _eventAMQPChannelClose(err) {
134 const _scope = _fileScope('_eventChannelClose');
135 this.channel = undefined;
136 const isFatal = err && isFatalAMQPError(err);
137 const _log = isFatal ? this.logger.error : this.logger.debug;
138 _log(_scope, 'channel close event', { err });
139 }
140
141 _eventAMQPChannelError(err) {
142 const _scope = _fileScope('_eventChannelError');
143 this.logger.error(_scope, 'channel error event', { err });
144 }
145
146 _eventAMQPChannelReturn(msg) {
147 const _scope = _fileScope('_eventChannelReturn');
148 this.logger.error(_scope, 'channel return event', { msg });
149 }
150
151 _eventAMQPChannelDrain() {
152 const _scope = _fileScope('_eventChannelDrain');
153 this.logger.debug(_scope, 'channel drain event');
154 }
155
156 _exchangeName(name) {
157 return [
158 ...(this.options.prefix && [this.options.prefix] || []),
159 name,
160 ].join('.');
161 }
162
163 _retryExchangeName(name) {
164 return [
165 ...(this.options.prefix && [this.options.prefix] || []),
166 name,
167 this.options.retrySuffix,
168 ].join('.');
169 }
170
171 _queueName(name) {
172 return [
173 ...(this.options.prefix && [this.options.prefix] || []),
174 name,
175 this.options.queueSuffix,
176 ].join('.');
177 }
178
179 _retryQueueName(name) {
180 return [
181 ...(this.options.prefix && [this.options.prefix] || []),
182 name,
183 this.options.retrySuffix,
184 this.options.queueSuffix,
185 ].join('.');
186 }
187
188 _retryQueuePolicyName(name) {
189 return [
190 ...(this.options.prefix && [this.options.prefix] || []),
191 ...((name || '').split('.').map(() => '.*')),
192 this.options.retrySuffix,
193 this.options.queueSuffix,
194 ].join('\\.');
195 }
196
197 /**
198 * Note:
199 * - RabbitMQ does not currently support creating quorum queues with message-ttl.
200 * - amqplib does not provide an API to set a ttl policy on the retry queue
201 * @see policyCommand
202 * @param {string} name name
203 */
204 async establishAMQPPlumbing(name) {
205 const _scope = _fileScope('establishAMQPPlumbing');
206
207 const exchangeOptions = {
208 durable: true, // Exchanges are permanent
209 };
210
211 const queueOptions = {
212 arguments: {
213 'x-queue-type': this.options.queueType, // quorum for fault-tolerance and data-safety
214 'x-dead-letter-exchange': this._retryExchangeName(name),
215 },
216 durable: true, // Queues are permanent
217 noAck: false, // Queues expect explicit acknowledgement of delivery
218 };
219
220 const queueRetryOptions = {
221 arguments: {
222 'x-queue-type': this.options.queueType,
223 'x-dead-letter-exchange': this._exchangeName(name),
224 // 'x-message-ttl': workerRetryDelayMs, // Cannot currently set on quorum queues, define a policy instead. (No api for that??)
225 },
226 durable: true,
227 noAck: false,
228 };
229
230 const exchangeName = this._exchangeName(name);
231 const retryExchangeName = this._retryExchangeName(name);
232 const queueName = this._queueName(name);
233 const retryQueueName = this._retryQueueName(name);
234
235 this.logger.debug(_scope, 'plumbing', { exchangeName, retryExchangeName, queueName, retryQueueName });
236
237 try {
238 const [ exchange, retryExchange, queue, retryQueue ] = await Promise.all([
239 this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
240 this.channel.assertExchange(retryExchangeName, this.options.exchangeType, exchangeOptions),
241 this.channel.assertQueue(queueName, queueOptions),
242 this.channel.assertQueue(retryQueueName, queueRetryOptions),
243 ]);
244 const [ queueBinding, retryQueueBinding ] = await Promise.all([
245 this.channel.bindQueue(queueName, exchangeName, ''),
246 this.channel.bindQueue(retryQueueName, retryExchangeName, ''),
247 ]);
248 this.logger.debug(_scope, 'plumbing asserted', { exchange, retryExchange, queue, retryQueue, queueBinding, retryQueueBinding });
249 } catch (e) {
250 this.logger.error(_scope, 'failed', { error: e, name });
251 await this.close();
252 throw e;
253 }
254 }
255
256 /**
257 * Shutdown the channel and connection.
258 */
259 async close() {
260 const _scope = _fileScope('close');
261 this.logger.debug(_scope, 'called');
262
263 try {
264 if (this.channel) {
265 await this.channel.recover();
266 await this.channel.close();
267 }
268
269 if (this.connection) {
270 await this.connection.close();
271 }
272 this.logger.debug(_scope, 'closed');
273 } catch (e) {
274 this.logger.error(_scope, 'failed', { error: e });
275 throw e;
276 }
277 }
278
279 /**
280 * Minimal health-check, connection is writable.
281 * @returns {boolean} status
282 */
283 health() {
284 return !!this?.connection?.connection?.stream?.writable;
285 }
286
287 /**
288 * Generate an example cli command to create a retry-queue policy.
289 * @param {string} name name
290 * @returns {string} policy command
291 */
292 policyCommand(name) {
293 const settings = {
294 'message-ttl': this.options.retryDelayMs,
295 };
296 return [
297 'rabbitmqctl',
298 'set_policy',
299 'RetryQueueTTL',
300 `'${this._retryQueuePolicyName(name)}'`,
301 `'${JSON.stringify(settings)}'`,
302 '--apply-to queues',
303 ].join(' ');
304 }
305
306 }
307
308 module.exports = Base;