use log-helper fileScope, minor lint fix
[squeep-amqp-helper] / lib / publisher.js
1 'use strict';
2 const Base = require('./base');
3 const { fileScope } = require('@squeep/log-helper');
4
5 const _fileScope = fileScope(__filename);
6
7 class Publisher extends Base {
8 constructor(logger, options) {
9 super(logger, options);
10
11 this.keepSending = true;
12 this.drainQueue = [];
13 }
14
15 static publishDefaults = {
16 persistent: true,
17 };
18
19 /**
20 * Send a message to an exchange.
21 * @param {String} name
22 * @param {*} content
23 * @param {Object} options
24 */
25 async publish(name, content, options) {
26 const _scope = _fileScope('publish');
27
28 const exchangeName = this._exchangeName(name);
29
30 if (!(content instanceof Buffer)) {
31 content = Buffer.from(typeof(content) === 'object' ? JSON.stringify(content) : content);
32 }
33
34 const timestamp = (new Date()).getTime();
35 options = {
36 timestamp,
37 ...Publisher.publishDefaults,
38 ...options,
39 };
40
41 return new Promise((resolve, reject) => {
42 if (!this.keepSending) {
43 this.drainQueue.push({ exchangeName, content, options });
44 this.logger.debug(_scope, 'queued until drain', { exchangeName, queueLength: this.drainQueue.length });
45 return resolve(false);
46 }
47 this.keepSending = this.channel.publish(this._exchangeName(name), '', content, options, (err, ok) => {
48 if (err) {
49 this.logger.error(_scope, 'channel publish failed', { exchangeName, err });
50 return reject(err);
51 }
52 this.logger.debug(_scope, 'success', { exchangeName, options, ok });
53 return resolve(ok);
54 });
55 });
56 }
57
58 /**
59 * Resume publishing after channel has drained, sending any pending
60 * messages.
61 */
62 _eventAMQPChannelDrain() {
63 const _scope = _fileScope('_eventChannelDrain');
64 this.logger.debug(_scope, 'channel drain event', { queueLength: this.drainQueue.length });
65 this.keepSending = true;
66 this._publishDrainQueue();
67 }
68
69 /**
70 * Attempt to send any pending messages, similar to publish but without
71 * async behavior.
72 */
73 _publishDrainQueue() {
74 const _scope = _fileScope('publishDrainQueue');
75 while (this.keepSending && this.drainQueue.length) {
76 const { exchangeName, content, options } = this.drainQueue.shift();
77 this.keepSending = this.channel.publish(exchangeName, '', content, options, (err, ok) => {
78 if (err) {
79 this.logger.error(_scope, 'channel publish failed', { err, exchangeName, options });
80 return;
81 }
82 this.logger.debug(_scope, 'success', { exchangeName, options, ok });
83 });
84 }
85 }
86
87 /**
88 * Producer only needs to plumb the exchange it sends to.
89 *
90 * @param {String} name
91 */
92 async establishAMQPPlumbing(name) {
93 const _scope = _fileScope('establishAMQPPlumbing');
94
95 const exchangeOptions = {
96 durable: true, // Exchanges are permanent
97 };
98
99 const exchangeName = this._exchangeName(name);
100
101 this.logger.debug(_scope, 'plumbing', { exchangeName });
102
103 try {
104 const [ exchange ] = await Promise.all([
105 this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
106 ]);
107 this.logger.debug(_scope, 'plumbing asserted', { exchange });
108 } catch (e) {
109 this.logger.error(_scope, 'failed', { error: e, name });
110 await this.close();
111 throw e;
112 }
113 }
114
115 }
116
117 module.exports = Publisher;