+ /**
+ * Process messages from proffered ticket queue.
+ * Attempt to redeem ticket and publish to redeemed token queue.
+ * @param {AMQPChannel} channel
+ * @param {Buffer} message
+ */
+ async queuedTicketProcessor(channel, message) {
+ const _scope = _fileScope('queuedTicketProcessor');
+
+ const queueName = this.options.queues.ticketRedeemedName;
+ let payload, ticket, resource, subject, iss;
+ try {
+ payload = message.content.toString();
+
+ this.logger.debug(_scope, 'processing ticket', { payload });
+ ({
+ ticket,
+ resource,
+ subject,
+ iss,
+ } = JSON.parse(payload));
+ } catch (e) {
+ this.logger.error(_scope, 'could not parse message, discarding', { error: e, message });
+ channel.ack(message);
+ return;
+ }
+
+ let issuerUrlObj;
+ try {
+ if (iss) {
+ issuerUrlObj = new URL(iss);
+ }
+ } catch (e) {
+ this.logger.debug(_scope, 'unparsable issuer, falling back to resource discovery', { error: e, payload });
+ }
+
+ let resourceUrlObj;
+ try {
+ resourceUrlObj = new URL(resource);
+ } catch (e) {
+ this.logger.error(_scope, 'unparsable resource, discarding', { payload });
+ channel.ack(message);
+ return;
+ }
+
+ let isNotRetryable = false;
+ try {
+ await this.db.context(async (dbCtx) => {
+
+ let token;
+ try {
+ token = await this.communication.redeemTicket(ticket, resourceUrlObj, issuerUrlObj);
+ isNotRetryable = true; // assume we cannot redeem a ticket more than once
+ this.logger.debug(_scope, 'successfully redeemed ticket', { token, payload });
+ channel.ack(message);
+ } catch (e) {
+ this.logger.error(_scope, 'failed to redeem ticket', { error: e, payload });
+ throw e;
+ }
+
+ // persist our redemption
+ const redeemedData = {
+ subject,
+ resource,
+ iss,
+ ticket,
+ token,
+ };
+ await this.db.ticketRedeemed(dbCtx, redeemedData);
+
+ try {
+ const result = await this.queuePublisher.publish(queueName, redeemedData);
+ this.logger.info(_scope, 'published ticket token', { queueName, ticket, resource, subject, iss, result });
+ } catch (e) {
+ this.logger.error(_scope, 'failed to publish token to queue', { error: e, queueName, ticket, token, resource, subject });
+ throw e; // return a 500
+ }
+
+ await this.db.ticketTokenPublished(dbCtx, redeemedData);
+
+ }); // dbCtx
+
+ } catch (e) {
+ isNotRetryable = isNotRetryable
+ || (e instanceof CommunicationErrors.ValidationError)
+ || (e?.response?.statusCode < 500)
+ ;
+ if (isNotRetryable) {
+ this.logger.error(_scope, 'failed to process ticket, not requeuing', { error: e, payload });
+ channel.ack(message);
+ return;
+ }
+ this.logger.error(_scope, 'failed to process ticket, requeuing', { error: e, payload });
+ throw e;
+ }
+ }
+
+