const { MysteryBox } = require('@squeep/mystery-box');
const DBErrors = require('./db/errors');
const Chores = require('./chores');
-const { Publisher: QueuePublisher } = require('@squeep/amqp-helper');
+const { Publisher: QueuePublisher, Consumer: QueueConsumer } = require('@squeep/amqp-helper');
const _fileScope = common.fileScope(__filename);
this.options = options;
this.logger = logger;
this.db = db;
- this.chores = new Chores(logger, db, options);
this.communication = new Communication(logger, options);
if (options.queues.amqp.url) {
this.queuePublisher = new QueuePublisher(logger, options.queues.amqp);
+ this.queueConsumer = new QueueConsumer(logger, options.queues.amqp);
}
+ this.chores = new Chores(logger, db, this.queuePublisher, options);
this.mysteryBox = new MysteryBox(options);
this.mysteryBox.on('statistics', common.mysteryBoxLogger(logger, _fileScope(this.constructor.name)));
async _connectQueues() {
await this.queuePublisher.connect();
await this.queuePublisher.establishAMQPPlumbing(this.options.queues.ticketPublishName);
+ await this.queuePublisher.establishAMQPPlumbing(this.options.queues.ticketRedeemedName);
+
+ await this.queueConsumer.connect();
+ await this.queueConsumer.establishAMQPPlumbing(this.options.queues.ticketPublishName);
+ const boundTicketProcessor = this.queuedTicketProcessor.bind(this);
+ await this.queueConsumer.consume(this.options.queues.ticketPublishName, boundTicketProcessor);
}
/**
* Get up-to-date profile data from selected profile endpoint.
* @param {Object} ctx
- * @returns {Object}
+ * @returns {Promise<Object>}
*/
async _fetchConsentProfileData(ctx) {
const _scope = _fileScope('_fetchConsentProfileData');
* @param {String} payload.identifier user generating ticket
* @param {} payload.profile profile of user generating ticket
* @param {Number} payload.ticketLifespanSeconds ticket redeemable for this long
- * @returns {String}
+ * @returns {Promise<String>}
*/
async _mintTicket({ subject, resource, scopes, identifier, profile, ticketLifespanSeconds }) {
const _scope = _fileScope('_mintTicket');
/**
*
* @param {String} ticket
- * @returns {Ticket}
+ * @returns {Promise<Ticket>}
*/
async _unpackTicket(ticket) {
const ticketObj = await this.mysteryBox.unpack(ticket);
this.logger.debug(_scope, 'called', { ctx });
await this._restoreSessionFromCode(ctx);
- await this._checkSessionMatchingRedirectUri(ctx);
+ this._checkSessionMatchingRedirectUri(ctx);
if (ctx.session.error) {
throw new ResponseError(Enum.ErrorResponse.BadRequest);
}
const queueName = this.options.queues.ticketPublishName;
- const { ticket, resource, subject } = ctx.parsedBody;
+ const { ticket, resource, subject, iss } = ctx.parsedBody;
+ if (iss) {
+ try {
+ new URL(iss);
+ } catch (e) {
+ this.logger.debug(_scope, 'unparsable issuer', { ticket, resource, subject, iss, ctx });
+ // continue, will try resource for metadata
+ }
+ }
try {
new URL(resource);
} catch (e) {
}
try {
- const result = await this.queuePublisher.publish(queueName, { ticket, resource, subject, epochMs: Date.now() });
+ const result = await this.queuePublisher.publish(queueName, { ticket, resource, subject, iss, epochMs: Date.now() });
this.logger.debug(_scope, 'accepted ticket offer', { queueName, ticket, resource, subject, ctx, result });
} catch (e) {
this.logger.error(_scope, 'failed to publish ticket to queue', { error: e, queueName, ticket, resource, subject, ctx });
}
+ /**
+ * Process messages from proffered ticket queue.
+ * Attempt to redeem ticket and publish to redeemed token queue.
+ * @param {AMQPChannel} channel
+ * @param {Buffer} message
+ */
+ async queuedTicketProcessor(channel, message) {
+ const _scope = _fileScope('queuedTicketProcessor');
+
+ const queueName = this.options.queues.ticketRedeemedName;
+ let payload, ticket, resource, subject, iss;
+ try {
+ payload = message.content.toString();
+
+ this.logger.debug(_scope, 'processing ticket', { payload });
+ ({
+ ticket,
+ resource,
+ subject,
+ iss,
+ } = JSON.parse(payload));
+ } catch (e) {
+ this.logger.error(_scope, 'could not parse message, discarding', { error: e, message });
+ channel.ack(message);
+ return;
+ }
+
+ let issuerUrlObj;
+ try {
+ if (iss) {
+ issuerUrlObj = new URL(iss);
+ }
+ } catch (e) {
+ this.logger.debug(_scope, 'unparsable issuer, falling back to resource discovery', { error: e, payload });
+ }
+
+ let resourceUrlObj;
+ try {
+ resourceUrlObj = new URL(resource);
+ } catch (e) {
+ this.logger.error(_scope, 'unparsable resource, discarding', { payload });
+ channel.ack(message);
+ return;
+ }
+
+ let isNotRetryable = false;
+ try {
+ await this.db.context(async (dbCtx) => {
+
+ let token;
+ try {
+ token = await this.communication.redeemTicket(ticket, resourceUrlObj, issuerUrlObj);
+ isNotRetryable = true; // assume we cannot redeem a ticket more than once
+ this.logger.debug(_scope, 'successfully redeemed ticket', { token, payload });
+ channel.ack(message);
+ } catch (e) {
+ this.logger.error(_scope, 'failed to redeem ticket', { error: e, payload });
+ throw e;
+ }
+
+ // persist our redemption
+ const redeemedData = {
+ subject,
+ resource,
+ iss,
+ ticket,
+ token,
+ };
+ await this.db.ticketRedeemed(dbCtx, redeemedData);
+
+ try {
+ const result = await this.queuePublisher.publish(queueName, redeemedData);
+ this.logger.info(_scope, 'published ticket token', { queueName, ticket, resource, subject, iss, result });
+ } catch (e) {
+ this.logger.error(_scope, 'failed to publish token to queue', { error: e, queueName, ticket, token, resource, subject });
+ throw e; // return a 500
+ }
+
+ await this.db.ticketTokenPublished(dbCtx, redeemedData);
+
+ }); // dbCtx
+
+ } catch (e) {
+ isNotRetryable = isNotRetryable
+ || (e instanceof CommunicationErrors.ValidationError)
+ || (e?.response?.statusCode < 500)
+ ;
+ if (isNotRetryable) {
+ this.logger.error(_scope, 'failed to process ticket, not requeuing', { error: e, payload });
+ channel.ack(message);
+ return;
+ }
+ this.logger.error(_scope, 'failed to process ticket, requeuing', { error: e, payload });
+ throw e;
+ }
+ }
+
+
/**
* Validate a token and return data about it.
* @param {http.ServerResponse} res
this.logger.debug(_scope, 'ticket created', { ctx, ticketData, subjectData });
try {
- const result = await this.communication.deliverTicket(ctx.ticketEndpointUrl, ctx.ticketResourceUrl, ctx.ticketSubjectUrl, ticket);
+ const issuerUrl = new URL(this.options.dingus.selfBaseUrl);
+ const result = await this.communication.deliverTicket(ctx.ticketEndpointUrl, ctx.ticketResourceUrl, ctx.ticketSubjectUrl, ticket, issuerUrl);
ctx.notifications.push(`Success! Ticket was delivered. (${result?.statusMessage})`);
- this.logger.info(_scope, 'ticket delivered', { ctx, result });
+ this.logger.info(_scope, 'ticket delivered', { ctx });
} catch (e) {
this.logger.error(_scope, 'failed to deliver ticket', { ctx, error: e });
ctx.errors.push(`Failed to deliver ticket. (${e})`);
}
-module.exports = Manager;
\ No newline at end of file
+module.exports = Manager;