X-Git-Url: http://git.squeep.com/?p=squeep-amqp-helper;a=blobdiff_plain;f=lib%2Fpublisher.js;fp=lib%2Fpublisher.js;h=453b575734ff6a83529689681c716e376e8f9649;hp=0000000000000000000000000000000000000000;hb=174280d3f44ba13dac0b26d42d968189a4f4fa93;hpb=67905316ada5ee4668306506705f4ee2a5f407f0 diff --git a/lib/publisher.js b/lib/publisher.js new file mode 100644 index 0000000..453b575 --- /dev/null +++ b/lib/publisher.js @@ -0,0 +1,113 @@ +'use strict'; +const Base = require('./base'); +const common = require('./common'); + +const _fileScope = common.fileScope(__filename); + +class Publisher extends Base { + constructor(logger, options) { + super(logger, options); + + this.keepSending = true; + this.drainQueue = []; + } + + static publishDefaults = { + persistent: true, + }; + + /** + * Send a message to an exchange. + * @param {String} name + * @param {*} content + * @param {Object} options + */ + async publish(name, content, options) { + const _scope = _fileScope('publish'); + + const exchangeName = this._exchangeName(name); + + if (!(content instanceof Buffer)) { + content = Buffer.from(typeof(content) === 'object' ? JSON.stringify(content) : content); + } + + const timestamp = (new Date()).getTime(); + options = Object.assign({ timestamp }, Publisher.publishDefaults, options); + + return new Promise((resolve, reject) => { + if (!this.keepSending) { + this.drainQueue.push({ exchangeName, content, options }); + this.logger.debug(_scope, 'queued until drain', { exchangeName, queueLength: this.drainQueue.length }); + return resolve(false); + } + this.keepSending = this.channel.publish(this._exchangeName(name), '', content, options, (err, ok) => { + if (err) { + this.logger.error(_scope, 'channel publish failed', { exchangeName, err }); + return reject(err); + } + this.logger.debug(_scope, 'success', { exchangeName, options, ok }); + return resolve(ok); + }); + }); + } + + /** + * Resume publishing after channel has drained, sending any pending + * messages. + */ + _eventAMQPChannelDrain() { + const _scope = _fileScope('_eventChannelDrain'); + this.logger.debug(_scope, 'channel drain event', { queueLength: this.drainQueue.length }); + this.keepSending = true; + this._publishDrainQueue(); + } + + /** + * Attempt to send any pending messages, similar to publish but without + * async behavior. + */ + _publishDrainQueue() { + const _scope = _fileScope('publishDrainQueue'); + while (this.keepSending && this.drainQueue.length) { + const { exchangeName, content, options } = this.drainQueue.shift(); + this.keepSending = this.channel.publish(exchangeName, '', content, options, (err, ok) => { + if (err) { + this.logger.error(_scope, 'channel publish failed', { err, exchangeName, options }); + return; + } + this.logger.debug(_scope, 'success', { exchangeName, options, ok }); + }); + } + } + + /** + * Producer only needs to plumb the exchange it sends to. + * + * @param {String} name + */ + async establishAMQPPlumbing(name) { + const _scope = _fileScope('establishAMQPPlumbing'); + + const exchangeOptions = { + durable: true, // Exchanges are permanent + }; + + const exchangeName = this._exchangeName(name); + + this.logger.debug(_scope, 'plumbing', { exchangeName }); + + try { + const [ exchange ] = await Promise.all([ + this.channel.assertExchange(exchangeName, this.options.exchangeType, exchangeOptions), + ]); + this.logger.debug(_scope, 'plumbing asserted', { exchange }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, name }); + await this.close(); + throw e; + } + } + +} + +module.exports = Publisher; \ No newline at end of file