initial commit
[squeep-amqp-helper] / lib / base.js
1 'use strict';
2 const amqp = require('amqplib');
3 const { isFatalError: isFatalAMQPError } = require('amqplib/lib/connection');
4 const common = require('./common');
5
6 const _fileScope = common.fileScope(__filename);
7
8 /**
9 * Base class common to worker publisher and consumer, handling
10 * the AMQP setup.
11 */
12
13 class Base {
14 /**
15 * @param {Console} logger
16 * @param {Object} options
17 * @param {String} options.url
18 * @param {Number=} options.prefetch
19 * @param {Object=} options.socketOptions
20 * @param {Boolean=} options.socketOptions.noDelay
21 * @param {Number=} options.socketOptions.timeout
22 * @param {Boolean=} options.socketOptions.keepAlive
23 * @param {Number=} options.socketOptions.keepAliveDelay
24 * @param {Object=} options.socketOptions.clientProperties
25 */
26 constructor(logger, options) {
27 this.logger = logger;
28 this.options = Object.assign({
29 url: undefined,
30 name: 'messages',
31 prefix: 'squeep',
32 queueSuffix: 'queue',
33 retrySuffix: 'retry',
34 exchangeType: 'direct',
35 queueType: 'quorum',
36 retryDelayMs: 10000,
37 prefetch: 1,
38 }, options);
39 this.options.socketOptions = Object.assign({
40 noDelay: undefined,
41 timeout: undefined,
42 keepAlive: undefined,
43 keepAliveDelay: undefined,
44 clientProperties: undefined,
45 }, options.socketOptions);
46
47 this.connection = undefined;
48 this.channel = undefined;
49 }
50
51
52 /**
53 * Establish the necessary connections to the queue and lock services.
54 */
55 async connect() {
56 const _scope = _fileScope('connect');
57 this.logger.debug(_scope, 'called');
58
59 try {
60 await this._connectAMQP();
61 } catch (e) {
62 this.logger.error(_scope, 'failed', { error: e });
63 await this.close();
64 throw e;
65 }
66 }
67
68 async _connectAMQP() {
69 await this._establishAMQPConnection();
70 await this._establishAMQPChannel();
71 }
72
73 async _establishAMQPConnection() {
74 const _scope = _fileScope('_establishConnection');
75 const { url, socketOptions } = this.options;
76 try {
77 this.connection = await amqp.connect(url, socketOptions);
78 this.connection.on('close', this._eventAMQPConnectionClose.bind(this));
79 this.connection.on('error', this._eventAMQPConnectionError.bind(this));
80 this.connection.on('blocked', this._eventAMQPConnectionBlocked.bind(this));
81 this.connection.on('unblocked', this._eventAMQPConnectionUnblocked.bind(this));
82 } catch (e) {
83 this.logger.error(_scope, 'failed', { error: e });
84 throw e;
85 }
86 }
87
88 _eventAMQPConnectionClose(err) {
89 const _scope = _fileScope('_eventConnectionClose');
90 this.connection = undefined;
91 this.channel = undefined;
92 const isFatal = isFatalAMQPError(err);
93 const _log = isFatal ? this.logger.error : this.logger.debug;
94 _log(_scope, 'connection close event', { err });
95 }
96
97 _eventAMQPConnectionError(err) {
98 const _scope = _fileScope('_eventConnectionError');
99 this.logger.error(_scope, 'connection error event', { err });
100 }
101
102 _eventAMQPConnectionBlocked(reason) {
103 const _scope = _fileScope('_eventAMQPConnectionBlocked');
104 this.logger.debug(_scope, 'connection blocked event', { reason });
105 }
106
107 _eventAMQPConnectionUnblocked() {
108 const _scope = _fileScope('_eventAMQPConnectionUnblocked');
109 this.logger.debug(_scope, 'connection unblocked event');
110 }
111
112 async _establishAMQPChannel() {
113 const _scope = _fileScope('_establishChannel');
114 try {
115 this.channel = await this.connection.createConfirmChannel();
116 this.channel.on('close', this._eventAMQPChannelClose.bind(this));
117 this.channel.on('error', this._eventAMQPChannelError.bind(this));
118 this.channel.on('return', this._eventAMQPChannelReturn.bind(this));
119 this.channel.on('drain', this._eventAMQPChannelDrain.bind(this));
120 } catch (e) {
121 this.logger.error(_scope, 'failed', { error: e });
122 throw e;
123 }
124 }
125
126 _eventAMQPChannelClose(err) {
127 const _scope = _fileScope('_eventChannelClose');
128 this.channel = undefined;
129 const isFatal = err && isFatalAMQPError(err);
130 const _log = isFatal ? this.logger.error : this.logger.debug;
131 _log(_scope, 'channel close event', { err });
132 }
133
134 _eventAMQPChannelError(err) {
135 const _scope = _fileScope('_eventChannelError');
136 this.logger.error(_scope, 'channel error event', { err });
137 }
138
139 _eventAMQPChannelReturn(msg) {
140 const _scope = _fileScope('_eventChannelReturn');
141 this.logger.error(_scope, 'channel return event', { msg });
142 }
143
144 _eventAMQPChannelDrain() {
145 const _scope = _fileScope('_eventChannelDrain');
146 this.logger.debug(_scope, 'channel drain event');
147 }
148
149 _exchangeName(name) {
150 return [
151 ...(this.options.prefix && [this.options.prefix] || []),
152 name,
153 ].join('.');
154 }
155
156 _retryExchangeName(name) {
157 return [
158 ...(this.options.prefix && [this.options.prefix] || []),
159 name,
160 this.options.retrySuffix,
161 ].join('.');
162 }
163
164 _queueName(name) {
165 return [
166 ...(this.options.prefix && [this.options.prefix] || []),
167 name,
168 this.options.queueSuffix,
169 ].join('.');
170 }
171
172 _retryQueueName(name) {
173 return [
174 ...(this.options.prefix && [this.options.prefix] || []),
175 name,
176 this.options.retrySuffix,
177 this.options.queueSuffix,
178 ].join('.');
179 }
180
181 _retryQueuePolicyName(name) {
182 return [
183 ...(this.options.prefix && [this.options.prefix] || []),
184 ...((name || '').split('.').map(() => '.*')),
185 this.options.retrySuffix,
186 this.options.queueSuffix,
187 ].join('\\.');
188 }
189
190 /**
191 * Note:
192 * - RabbitMQ does not currently support creating quorum queues with message-ttl.
193 * - amqplib does not provide an API to set a ttl policy on the retry queue
194 * @see {@link policyCommand}
195 *
196 * @param {String} name
197 */
198 async establishAMQPPlumbing(name) {
199 const _scope = _fileScope('establishAMQPPlumbing');
200
201 const exchangeOptions = {
202 durable: true, // Exchanges are permanent
203 };
204
205 const queueOptions = {
206 arguments: {
207 'x-queue-type': this.options.queueType, // quorum for fault-tolerance and data-safety
208 'x-dead-letter-exchange': this._retryExchangeName(name),
209 },
210 durable: true, // Queues are permanent
211 noAck: false, // Queues expect explicit acknowledgement of delivery
212 };
213
214 const queueRetryOptions = {
215 arguments: {
216 'x-queue-type': this.options.queueType,
217 'x-dead-letter-exchange': this._exchangeName(name),
218 // 'x-message-ttl': workerRetryDelayMs, // Cannot currently set on quorum queues, define a policy instead. (No api for that??)
219 },
220 durable: true,
221 noAck: false,
222 };
223
224 const exchangeName = this._exchangeName(name);
225 const retryExchangeName = this._retryExchangeName(name);
226 const queueName = this._queueName(name);
227 const retryQueueName = this._retryQueueName(name);
228
229 this.logger.debug(_scope, 'plumbing', { exchangeName, retryExchangeName, queueName, retryQueueName });
230
231 try {
232 const [ exchange, retryExchange, queue, retryQueue ] = await Promise.all([
233 this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
234 this.channel.assertExchange(retryExchangeName, this.options.exchangeType, exchangeOptions),
235 this.channel.assertQueue(queueName, queueOptions),
236 this.channel.assertQueue(retryQueueName, queueRetryOptions),
237 ]);
238 const [ queueBinding, retryQueueBinding ] = await Promise.all([
239 this.channel.bindQueue(queueName, exchangeName, ''),
240 this.channel.bindQueue(retryQueueName, retryExchangeName, ''),
241 ]);
242 this.logger.debug(_scope, 'plumbing asserted', { exchange, retryExchange, queue, retryQueue, queueBinding, retryQueueBinding });
243 } catch (e) {
244 this.logger.error(_scope, 'failed', { error: e, name });
245 await this.close();
246 throw e;
247 }
248 }
249
250 /**
251 * Shutdown the channel and connection.
252 */
253 async close() {
254 const _scope = _fileScope('close');
255 this.logger.debug(_scope, 'called');
256
257 try {
258 if (this.channel) {
259 await this.channel.recover();
260 await this.channel.close();
261 }
262
263 if (this.connection) {
264 await this.connection.close();
265 }
266 this.logger.debug(_scope, 'closed');
267 } catch (e) {
268 this.logger.error(_scope, 'failed', { error: e });
269 throw e;
270 }
271 }
272
273 /**
274 * Minimal health-check, connection is writable.
275 * @returns {Boolean}
276 */
277 health() {
278 return !!this?.connection?.connection?.stream?.writable;
279 }
280
281 /**
282 * Generate an example cli command to create a retry-queue policy.
283 */
284 policyCommand(name) {
285 const settings = {
286 'message-ttl': this.options.retryDelayMs,
287 };
288 return [
289 'rabbitmqctl',
290 'set_policy',
291 'RetryQueueTTL',
292 `'${this._retryQueuePolicyName(name)}'`,
293 `'${JSON.stringify(settings)}'`,
294 '--apply-to queues',
295 ].join(' ');
296 }
297
298 }
299
300 module.exports = Base;