initial commit
[squeep-amqp-helper] / lib / publisher.js
1 'use strict';
2 const Base = require('./base');
3 const common = require('./common');
4
5 const _fileScope = common.fileScope(__filename);
6
7 class Publisher extends Base {
8 constructor(logger, options) {
9 super(logger, options);
10
11 this.keepSending = true;
12 this.drainQueue = [];
13 }
14
15 static publishDefaults = {
16 persistent: true,
17 };
18
19 /**
20 * Send a message to an exchange.
21 * @param {String} name
22 * @param {*} content
23 * @param {Object} options
24 */
25 async publish(name, content, options) {
26 const _scope = _fileScope('publish');
27
28 const exchangeName = this._exchangeName(name);
29
30 if (!(content instanceof Buffer)) {
31 content = Buffer.from(typeof(content) === 'object' ? JSON.stringify(content) : content);
32 }
33
34 const timestamp = (new Date()).getTime();
35 options = Object.assign({ timestamp }, Publisher.publishDefaults, options);
36
37 return new Promise((resolve, reject) => {
38 if (!this.keepSending) {
39 this.drainQueue.push({ exchangeName, content, options });
40 this.logger.debug(_scope, 'queued until drain', { exchangeName, queueLength: this.drainQueue.length });
41 return resolve(false);
42 }
43 this.keepSending = this.channel.publish(this._exchangeName(name), '', content, options, (err, ok) => {
44 if (err) {
45 this.logger.error(_scope, 'channel publish failed', { exchangeName, err });
46 return reject(err);
47 }
48 this.logger.debug(_scope, 'success', { exchangeName, options, ok });
49 return resolve(ok);
50 });
51 });
52 }
53
54 /**
55 * Resume publishing after channel has drained, sending any pending
56 * messages.
57 */
58 _eventAMQPChannelDrain() {
59 const _scope = _fileScope('_eventChannelDrain');
60 this.logger.debug(_scope, 'channel drain event', { queueLength: this.drainQueue.length });
61 this.keepSending = true;
62 this._publishDrainQueue();
63 }
64
65 /**
66 * Attempt to send any pending messages, similar to publish but without
67 * async behavior.
68 */
69 _publishDrainQueue() {
70 const _scope = _fileScope('publishDrainQueue');
71 while (this.keepSending && this.drainQueue.length) {
72 const { exchangeName, content, options } = this.drainQueue.shift();
73 this.keepSending = this.channel.publish(exchangeName, '', content, options, (err, ok) => {
74 if (err) {
75 this.logger.error(_scope, 'channel publish failed', { err, exchangeName, options });
76 return;
77 }
78 this.logger.debug(_scope, 'success', { exchangeName, options, ok });
79 });
80 }
81 }
82
83 /**
84 * Producer only needs to plumb the exchange it sends to.
85 *
86 * @param {String} name
87 */
88 async establishAMQPPlumbing(name) {
89 const _scope = _fileScope('establishAMQPPlumbing');
90
91 const exchangeOptions = {
92 durable: true, // Exchanges are permanent
93 };
94
95 const exchangeName = this._exchangeName(name);
96
97 this.logger.debug(_scope, 'plumbing', { exchangeName });
98
99 try {
100 const [ exchange ] = await Promise.all([
101 this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions),
102 ]);
103 this.logger.debug(_scope, 'plumbing asserted', { exchange });
104 } catch (e) {
105 this.logger.error(_scope, 'failed', { error: e, name });
106 await this.close();
107 throw e;
108 }
109 }
110
111 }
112
113 module.exports = Publisher;