use log-helper fileScope, minor lint fix
[squeep-amqp-helper] / lib / base.js
1 'use strict';
2 const amqp = require('amqplib');
3 const { isFatalError: isFatalAMQPError } = require('amqplib/lib/connection');
4 const { fileScope } = require('@squeep/log-helper');
5
6 const _fileScope = fileScope(__filename);
7
8 /**
9 * Base class common to worker publisher and consumer, handling
10 * the AMQP setup.
11 */
12
13 class Base {
14 /**
15 * @param {Console} logger
16 * @param {Object} options
17 * @param {String} options.url
18 * @param {Number=} options.prefetch
19 * @param {Object=} options.socketOptions
20 * @param {Boolean=} options.socketOptions.noDelay
21 * @param {Number=} options.socketOptions.timeout
22 * @param {Boolean=} options.socketOptions.keepAlive
23 * @param {Number=} options.socketOptions.keepAliveDelay
24 * @param {Object=} options.socketOptions.clientProperties
25 */
26 constructor(logger, options) {
27 this.logger = logger;
28 this.options = {
29 url: undefined,
30 name: 'messages',
31 prefix: 'squeep',
32 queueSuffix: 'queue',
33 retrySuffix: 'retry',
34 exchangeType: 'direct',
35 queueType: 'quorum',
36 retryDelayMs: 10000,
37 prefetch: 1,
38 ...options,
39 };
40 this.options.socketOptions = {
41 noDelay: undefined,
42 timeout: undefined,
43 keepAlive: undefined,
44 keepAliveDelay: undefined,
45 clientProperties: undefined,
46 ...options.socketOptions,
47 };
48
49 this.connection = undefined;
50 this.channel = undefined;
51 }
52
53
54 /**
55 * Establish the necessary connections to the queue.
56 */
57 async connect() {
58 const _scope = _fileScope('connect');
59 this.logger.debug(_scope, 'called');
60
61 try {
62 await this._connectAMQP();
63 } catch (e) {
64 this.logger.error(_scope, 'failed', { error: e });
65 await this.close();
66 throw e;
67 }
68 }
69
70 async _connectAMQP() {
71 await this._establishAMQPConnection();
72 await this._establishAMQPChannel();
73 }
74
75 async _establishAMQPConnection() {
76 const _scope = _fileScope('_establishConnection');
77 const { url, socketOptions } = this.options;
78 try {
79 this.connection = await amqp.connect(url, socketOptions);
80 this.connection.on('close', this._eventAMQPConnectionClose.bind(this));
81 this.connection.on('error', this._eventAMQPConnectionError.bind(this));
82 this.connection.on('blocked', this._eventAMQPConnectionBlocked.bind(this));
83 this.connection.on('unblocked', this._eventAMQPConnectionUnblocked.bind(this));
84 } catch (e) {
85 this.logger.error(_scope, 'failed', { error: e });
86 throw e;
87 }
88 }
89
90 _eventAMQPConnectionClose(err) {
91 const _scope = _fileScope('_eventConnectionClose');
92 this.connection = undefined;
93 this.channel = undefined;
94 const isFatal = isFatalAMQPError(err);
95 const _log = isFatal ? this.logger.error : this.logger.debug;
96 _log(_scope, 'connection close event', { err });
97 }
98
99 _eventAMQPConnectionError(err) {
100 const _scope = _fileScope('_eventConnectionError');
101 this.logger.error(_scope, 'connection error event', { err });
102 }
103
104 _eventAMQPConnectionBlocked(reason) {
105 const _scope = _fileScope('_eventAMQPConnectionBlocked');
106 this.logger.debug(_scope, 'connection blocked event', { reason });
107 }
108
109 _eventAMQPConnectionUnblocked() {
110 const _scope = _fileScope('_eventAMQPConnectionUnblocked');
111 this.logger.debug(_scope, 'connection unblocked event');
112 }
113
114 async _establishAMQPChannel() {
115 const _scope = _fileScope('_establishChannel');
116 try {
117 this.channel = await this.connection.createConfirmChannel();
118 this.channel.on('close', this._eventAMQPChannelClose.bind(this));
119 this.channel.on('error', this._eventAMQPChannelError.bind(this));
120 this.channel.on('return', this._eventAMQPChannelReturn.bind(this));
121 this.channel.on('drain', this._eventAMQPChannelDrain.bind(this));
122 } catch (e) {
123 this.logger.error(_scope, 'failed', { error: e });
124 throw e;
125 }
126 }
127
128 _eventAMQPChannelClose(err) {
129 const _scope = _fileScope('_eventChannelClose');
130 this.channel = undefined;
131 const isFatal = err && isFatalAMQPError(err);
132 const _log = isFatal ? this.logger.error : this.logger.debug;
133 _log(_scope, 'channel close event', { err });
134 }
135
136 _eventAMQPChannelError(err) {
137 const _scope = _fileScope('_eventChannelError');
138 this.logger.error(_scope, 'channel error event', { err });
139 }
140
141 _eventAMQPChannelReturn(msg) {
142 const _scope = _fileScope('_eventChannelReturn');
143 this.logger.error(_scope, 'channel return event', { msg });
144 }
145
146 _eventAMQPChannelDrain() {
147 const _scope = _fileScope('_eventChannelDrain');
148 this.logger.debug(_scope, 'channel drain event');
149 }
150
151 _exchangeName(name) {
152 return [
153 ...(this.options.prefix && [this.options.prefix] || []),
154 name,
155 ].join('.');
156 }
157
158 _retryExchangeName(name) {
159 return [
160 ...(this.options.prefix && [this.options.prefix] || []),
161 name,
162 this.options.retrySuffix,
163 ].join('.');
164 }
165
166 _queueName(name) {
167 return [
168 ...(this.options.prefix && [this.options.prefix] || []),
169 name,
170 this.options.queueSuffix,
171 ].join('.');
172 }
173
174 _retryQueueName(name) {
175 return [
176 ...(this.options.prefix && [this.options.prefix] || []),
177 name,
178 this.options.retrySuffix,
179 this.options.queueSuffix,
180 ].join('.');
181 }
182
183 _retryQueuePolicyName(name) {
184 return [
185 ...(this.options.prefix && [this.options.prefix] || []),
186 ...((name || '').split('.').map(() => '.*')),
187 this.options.retrySuffix,
188 this.options.queueSuffix,
189 ].join('\\.');
190 }
191
192 /**
193 * Note:
194 * - RabbitMQ does not currently support creating quorum queues with message-ttl.
195 * - amqplib does not provide an API to set a ttl policy on the retry queue
196 * @see {@link policyCommand}
197 *
198 * @param {String} name
199 */
200 async establishAMQPPlumbing(name) {
201 const _scope = _fileScope('establishAMQPPlumbing');
202
203 const exchangeOptions = {
204 durable: true, // Exchanges are permanent
205 };
206
207 const queueOptions = {
208 arguments: {
209 'x-queue-type': this.options.queueType, // quorum for fault-tolerance and data-safety
210 'x-dead-letter-exchange': this._retryExchangeName(name),
211 },
212 durable: true, // Queues are permanent
213 noAck: false, // Queues expect explicit acknowledgement of delivery
214 };
215
216 const queueRetryOptions = {
217 arguments: {
218 'x-queue-type': this.options.queueType,
219 'x-dead-letter-exchange': this._exchangeName(name),
220 // 'x-message-ttl': workerRetryDelayMs, // Cannot currently set on quorum queues, define a policy instead. (No api for that??)
221 },
222 durable: true,
223 noAck: false,
224 };
225
226 const exchangeName = this._exchangeName(name);
227 const retryExchangeName = this._retryExchangeName(name);
228 const queueName = this._queueName(name);
229 const retryQueueName = this._retryQueueName(name);
230
231 this.logger.debug(_scope, 'plumbing', { exchangeName, retryExchangeName, queueName, retryQueueName });
232
233 try {
234 const [ exchange, retryExchange, queue, retryQueue ] = await Promise.all([
235 this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
236 this.channel.assertExchange(retryExchangeName, this.options.exchangeType, exchangeOptions),
237 this.channel.assertQueue(queueName, queueOptions),
238 this.channel.assertQueue(retryQueueName, queueRetryOptions),
239 ]);
240 const [ queueBinding, retryQueueBinding ] = await Promise.all([
241 this.channel.bindQueue(queueName, exchangeName, ''),
242 this.channel.bindQueue(retryQueueName, retryExchangeName, ''),
243 ]);
244 this.logger.debug(_scope, 'plumbing asserted', { exchange, retryExchange, queue, retryQueue, queueBinding, retryQueueBinding });
245 } catch (e) {
246 this.logger.error(_scope, 'failed', { error: e, name });
247 await this.close();
248 throw e;
249 }
250 }
251
252 /**
253 * Shutdown the channel and connection.
254 */
255 async close() {
256 const _scope = _fileScope('close');
257 this.logger.debug(_scope, 'called');
258
259 try {
260 if (this.channel) {
261 await this.channel.recover();
262 await this.channel.close();
263 }
264
265 if (this.connection) {
266 await this.connection.close();
267 }
268 this.logger.debug(_scope, 'closed');
269 } catch (e) {
270 this.logger.error(_scope, 'failed', { error: e });
271 throw e;
272 }
273 }
274
275 /**
276 * Minimal health-check, connection is writable.
277 * @returns {Boolean}
278 */
279 health() {
280 return !!this?.connection?.connection?.stream?.writable;
281 }
282
283 /**
284 * Generate an example cli command to create a retry-queue policy.
285 */
286 policyCommand(name) {
287 const settings = {
288 'message-ttl': this.options.retryDelayMs,
289 };
290 return [
291 'rabbitmqctl',
292 'set_policy',
293 'RetryQueueTTL',
294 `'${this._retryQueuePolicyName(name)}'`,
295 `'${JSON.stringify(settings)}'`,
296 '--apply-to queues',
297 ].join(' ');
298 }
299
300 }
301
302 module.exports = Base;