bump package version to 1.0.1
[squeep-amqp-helper] / lib / publisher.js
1 'use strict';
2 const Base = require('./base');
3 const { fileScope } = require('@squeep/log-helper');
4
5 const _fileScope = fileScope(__filename);
6
7 class Publisher extends Base {
8 constructor(logger, options) {
9 super(logger, options);
10
11 this.keepSending = true;
12 this.drainQueue = [];
13 }
14
15 static publishDefaults = {
16 persistent: true,
17 };
18
19 /**
20 * @typedef {object} PublishOptions
21 * @property {number} timestamp timestamp in ms, defaults to current
22 * @property {boolean} persistent defaults to true
23 */
24 /**
25 * Send a message to an exchange.
26 * @param {string} name name
27 * @param {*} content message content
28 * @param {PublishOptions} options options
29 * @returns {Promise<void>}
30 */
31 async publish(name, content, options) {
32 const _scope = _fileScope('publish');
33
34 const exchangeName = this._exchangeName(name);
35
36 if (!(content instanceof Buffer)) {
37 content = Buffer.from(typeof(content) === 'object' ? JSON.stringify(content) : content);
38 }
39
40 const timestamp = Date.now();
41 options = {
42 timestamp,
43 ...Publisher.publishDefaults,
44 ...options,
45 };
46
47 return new Promise((resolve, reject) => {
48 if (!this.keepSending) {
49 this.drainQueue.push({ exchangeName, content, options });
50 this.logger.debug(_scope, 'queued until drain', { exchangeName, queueLength: this.drainQueue.length });
51 return resolve(false);
52 }
53 this.keepSending = this.channel.publish(this._exchangeName(name), '', content, options, (err, ok) => {
54 if (err) {
55 this.logger.error(_scope, 'channel publish failed', { exchangeName, err });
56 return reject(err);
57 }
58 this.logger.debug(_scope, 'success', { exchangeName, options, ok });
59 return resolve(ok);
60 });
61 });
62 }
63
64 /**
65 * Resume publishing after channel has drained, sending any pending
66 * messages.
67 */
68 _eventAMQPChannelDrain() {
69 const _scope = _fileScope('_eventChannelDrain');
70 this.logger.debug(_scope, 'channel drain event', { queueLength: this.drainQueue.length });
71 this.keepSending = true;
72 this._publishDrainQueue();
73 }
74
75 /**
76 * Attempt to send any pending messages, similar to publish but without
77 * async behavior.
78 */
79 _publishDrainQueue() {
80 const _scope = _fileScope('publishDrainQueue');
81 while (this.keepSending && this.drainQueue.length) {
82 const { exchangeName, content, options } = this.drainQueue.shift();
83 this.keepSending = this.channel.publish(exchangeName, '', content, options, (err, ok) => {
84 if (err) {
85 this.logger.error(_scope, 'channel publish failed', { err, exchangeName, options });
86 return;
87 }
88 this.logger.debug(_scope, 'success', { exchangeName, options, ok });
89 });
90 }
91 }
92
93 /**
94 * Producer only needs to plumb the exchange it sends to.
95 * @param {string} name name
96 */
97 async establishAMQPPlumbing(name) {
98 const _scope = _fileScope('establishAMQPPlumbing');
99
100 const exchangeOptions = {
101 durable: true, // Exchanges are permanent
102 };
103
104 const exchangeName = this._exchangeName(name);
105
106 this.logger.debug(_scope, 'plumbing', { exchangeName });
107
108 try {
109 const [ exchange ] = await Promise.all([
110 this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
111 ]);
112 this.logger.debug(_scope, 'plumbing asserted', { exchange });
113 } catch (e) {
114 this.logger.error(_scope, 'failed', { error: e, name });
115 await this.close();
116 throw e;
117 }
118 }
119
120 }
121
122 module.exports = Publisher;