X-Git-Url: http://git.squeep.com/?p=squeep-indie-auther;a=blobdiff_plain;f=src%2Fmanager.js;fp=src%2Fmanager.js;h=c79c7529670b8b83dba45f55ccb824716bd427c6;hp=2b64ed16b14f50558b614b2b33afc2ca4f2145e9;hb=726cd980f0ed5588cfe8cbb2d994d5e4aef6e292;hpb=3c145bc590577c738df4d279744f3e2f9a250294 diff --git a/src/manager.js b/src/manager.js index 2b64ed1..c79c752 100644 --- a/src/manager.js +++ b/src/manager.js @@ -8,7 +8,7 @@ const Template = require('./template'); const { MysteryBox } = require('@squeep/mystery-box'); const DBErrors = require('./db/errors'); const Chores = require('./chores'); -const { Publisher: QueuePublisher } = require('@squeep/amqp-helper'); +const { Publisher: QueuePublisher, Consumer: QueueConsumer } = require('@squeep/amqp-helper'); const _fileScope = common.fileScope(__filename); @@ -23,11 +23,12 @@ class Manager { this.options = options; this.logger = logger; this.db = db; - this.chores = new Chores(logger, db, options); this.communication = new Communication(logger, options); if (options.queues.amqp.url) { this.queuePublisher = new QueuePublisher(logger, options.queues.amqp); + this.queueConsumer = new QueueConsumer(logger, options.queues.amqp); } + this.chores = new Chores(logger, db, this.queuePublisher, options); this.mysteryBox = new MysteryBox(options); this.mysteryBox.on('statistics', common.mysteryBoxLogger(logger, _fileScope(this.constructor.name))); @@ -51,6 +52,12 @@ class Manager { async _connectQueues() { await this.queuePublisher.connect(); await this.queuePublisher.establishAMQPPlumbing(this.options.queues.ticketPublishName); + await this.queuePublisher.establishAMQPPlumbing(this.options.queues.ticketRedeemedName); + + await this.queueConsumer.connect(); + await this.queueConsumer.establishAMQPPlumbing(this.options.queues.ticketPublishName); + const boundTicketProcessor = this.queuedTicketProcessor.bind(this); + await this.queueConsumer.consume(this.options.queues.ticketPublishName, boundTicketProcessor); } @@ -578,7 +585,7 @@ class Manager { /** * Get up-to-date profile data from selected profile endpoint. * @param {Object} ctx - * @returns {Object} + * @returns {Promise} */ async _fetchConsentProfileData(ctx) { const _scope = _fileScope('_fetchConsentProfileData'); @@ -1188,7 +1195,7 @@ class Manager { * @param {String} payload.identifier user generating ticket * @param {} payload.profile profile of user generating ticket * @param {Number} payload.ticketLifespanSeconds ticket redeemable for this long - * @returns {String} + * @returns {Promise} */ async _mintTicket({ subject, resource, scopes, identifier, profile, ticketLifespanSeconds }) { const _scope = _fileScope('_mintTicket'); @@ -1222,7 +1229,7 @@ class Manager { /** * * @param {String} ticket - * @returns {Ticket} + * @returns {Promise} */ async _unpackTicket(ticket) { const ticketObj = await this.mysteryBox.unpack(ticket); @@ -1309,7 +1316,7 @@ class Manager { this.logger.debug(_scope, 'called', { ctx }); await this._restoreSessionFromCode(ctx); - await this._checkSessionMatchingRedirectUri(ctx); + this._checkSessionMatchingRedirectUri(ctx); if (ctx.session.error) { throw new ResponseError(Enum.ErrorResponse.BadRequest); @@ -1510,8 +1517,16 @@ class Manager { } const queueName = this.options.queues.ticketPublishName; - const { ticket, resource, subject } = ctx.parsedBody; + const { ticket, resource, subject, iss } = ctx.parsedBody; + if (iss) { + try { + new URL(iss); + } catch (e) { + this.logger.debug(_scope, 'unparsable issuer', { ticket, resource, subject, iss, ctx }); + // continue, will try resource for metadata + } + } try { new URL(resource); } catch (e) { @@ -1527,7 +1542,7 @@ class Manager { } try { - const result = await this.queuePublisher.publish(queueName, { ticket, resource, subject, epochMs: Date.now() }); + const result = await this.queuePublisher.publish(queueName, { ticket, resource, subject, iss, epochMs: Date.now() }); this.logger.debug(_scope, 'accepted ticket offer', { queueName, ticket, resource, subject, ctx, result }); } catch (e) { this.logger.error(_scope, 'failed to publish ticket to queue', { error: e, queueName, ticket, resource, subject, ctx }); @@ -1541,6 +1556,104 @@ class Manager { } + /** + * Process messages from proffered ticket queue. + * Attempt to redeem ticket and publish to redeemed token queue. + * @param {AMQPChannel} channel + * @param {Buffer} message + */ + async queuedTicketProcessor(channel, message) { + const _scope = _fileScope('queuedTicketProcessor'); + + const queueName = this.options.queues.ticketRedeemedName; + let payload, ticket, resource, subject, iss; + try { + payload = message.content.toString(); + + this.logger.debug(_scope, 'processing ticket', { payload }); + ({ + ticket, + resource, + subject, + iss, + } = JSON.parse(payload)); + } catch (e) { + this.logger.error(_scope, 'could not parse message, discarding', { error: e, message }); + channel.ack(message); + return; + } + + let issuerUrlObj; + try { + if (iss) { + issuerUrlObj = new URL(iss); + } + } catch (e) { + this.logger.debug(_scope, 'unparsable issuer, falling back to resource discovery', { error: e, payload }); + } + + let resourceUrlObj; + try { + resourceUrlObj = new URL(resource); + } catch (e) { + this.logger.error(_scope, 'unparsable resource, discarding', { payload }); + channel.ack(message); + return; + } + + let isNotRetryable = false; + try { + await this.db.context(async (dbCtx) => { + + let token; + try { + token = await this.communication.redeemTicket(ticket, resourceUrlObj, issuerUrlObj); + isNotRetryable = true; // assume we cannot redeem a ticket more than once + this.logger.debug(_scope, 'successfully redeemed ticket', { token, payload }); + channel.ack(message); + } catch (e) { + this.logger.error(_scope, 'failed to redeem ticket', { error: e, payload }); + throw e; + } + + // persist our redemption + const redeemedData = { + subject, + resource, + iss, + ticket, + token, + }; + await this.db.ticketRedeemed(dbCtx, redeemedData); + + try { + const result = await this.queuePublisher.publish(queueName, redeemedData); + this.logger.info(_scope, 'published ticket token', { queueName, ticket, resource, subject, iss, result }); + } catch (e) { + this.logger.error(_scope, 'failed to publish token to queue', { error: e, queueName, ticket, token, resource, subject }); + throw e; // return a 500 + } + + await this.db.ticketTokenPublished(dbCtx, redeemedData); + + }); // dbCtx + + } catch (e) { + isNotRetryable = isNotRetryable + || (e instanceof CommunicationErrors.ValidationError) + || (e?.response?.statusCode < 500) + ; + if (isNotRetryable) { + this.logger.error(_scope, 'failed to process ticket, not requeuing', { error: e, payload }); + channel.ack(message); + return; + } + this.logger.error(_scope, 'failed to process ticket, requeuing', { error: e, payload }); + throw e; + } + } + + /** * Validate a token and return data about it. * @param {http.ServerResponse} res @@ -1933,9 +2046,10 @@ class Manager { this.logger.debug(_scope, 'ticket created', { ctx, ticketData, subjectData }); try { - const result = await this.communication.deliverTicket(ctx.ticketEndpointUrl, ctx.ticketResourceUrl, ctx.ticketSubjectUrl, ticket); + const issuerUrl = new URL(this.options.dingus.selfBaseUrl); + const result = await this.communication.deliverTicket(ctx.ticketEndpointUrl, ctx.ticketResourceUrl, ctx.ticketSubjectUrl, ticket, issuerUrl); ctx.notifications.push(`Success! Ticket was delivered. (${result?.statusMessage})`); - this.logger.info(_scope, 'ticket delivered', { ctx, result }); + this.logger.info(_scope, 'ticket delivered', { ctx }); } catch (e) { this.logger.error(_scope, 'failed to deliver ticket', { ctx, error: e }); ctx.errors.push(`Failed to deliver ticket. (${e})`); @@ -2017,4 +2131,4 @@ class Manager { } -module.exports = Manager; \ No newline at end of file +module.exports = Manager;