redeem proffered tickets, db schema 1.1.0
[squeep-indie-auther] / src / manager.js
index 2b64ed16b14f50558b614b2b33afc2ca4f2145e9..c79c7529670b8b83dba45f55ccb824716bd427c6 100644 (file)
@@ -8,7 +8,7 @@ const Template = require('./template');
 const { MysteryBox } = require('@squeep/mystery-box');
 const DBErrors = require('./db/errors');
 const Chores = require('./chores');
-const { Publisher: QueuePublisher } = require('@squeep/amqp-helper');
+const { Publisher: QueuePublisher, Consumer: QueueConsumer } = require('@squeep/amqp-helper');
 
 const _fileScope = common.fileScope(__filename);
 
@@ -23,11 +23,12 @@ class Manager {
     this.options = options;
     this.logger = logger;
     this.db = db;
-    this.chores = new Chores(logger, db, options);
     this.communication = new Communication(logger, options);
     if (options.queues.amqp.url) {
       this.queuePublisher = new QueuePublisher(logger, options.queues.amqp);
+      this.queueConsumer = new QueueConsumer(logger, options.queues.amqp);
     }
+    this.chores = new Chores(logger, db, this.queuePublisher, options);
     this.mysteryBox = new MysteryBox(options);
     this.mysteryBox.on('statistics', common.mysteryBoxLogger(logger, _fileScope(this.constructor.name)));
 
@@ -51,6 +52,12 @@ class Manager {
   async _connectQueues() {
     await this.queuePublisher.connect();
     await this.queuePublisher.establishAMQPPlumbing(this.options.queues.ticketPublishName);
+    await this.queuePublisher.establishAMQPPlumbing(this.options.queues.ticketRedeemedName);
+
+    await this.queueConsumer.connect();
+    await this.queueConsumer.establishAMQPPlumbing(this.options.queues.ticketPublishName);
+    const boundTicketProcessor = this.queuedTicketProcessor.bind(this);
+    await this.queueConsumer.consume(this.options.queues.ticketPublishName, boundTicketProcessor);
   }
 
 
@@ -578,7 +585,7 @@ class Manager {
   /**
    * Get up-to-date profile data from selected profile endpoint.
    * @param {Object} ctx
-   * @returns {Object}
+   * @returns {Promise<Object>}
    */
   async _fetchConsentProfileData(ctx) {
     const _scope = _fileScope('_fetchConsentProfileData');
@@ -1188,7 +1195,7 @@ class Manager {
    * @param {String} payload.identifier user generating ticket
    * @param {} payload.profile profile of user generating ticket
    * @param {Number} payload.ticketLifespanSeconds ticket redeemable for this long
-   * @returns {String}
+   * @returns {Promise<String>}
   */
   async _mintTicket({ subject, resource, scopes, identifier, profile, ticketLifespanSeconds }) {
     const _scope = _fileScope('_mintTicket');
@@ -1222,7 +1229,7 @@ class Manager {
   /**
    * 
    * @param {String} ticket
-   * @returns {Ticket}
+   * @returns {Promise<Ticket>}
    */
   async _unpackTicket(ticket) {
     const ticketObj = await this.mysteryBox.unpack(ticket);
@@ -1309,7 +1316,7 @@ class Manager {
     this.logger.debug(_scope, 'called', { ctx });
 
     await this._restoreSessionFromCode(ctx);
-    await this._checkSessionMatchingRedirectUri(ctx);
+    this._checkSessionMatchingRedirectUri(ctx);
 
     if (ctx.session.error) {
       throw new ResponseError(Enum.ErrorResponse.BadRequest);
@@ -1510,8 +1517,16 @@ class Manager {
     }
 
     const queueName = this.options.queues.ticketPublishName;
-    const { ticket, resource, subject } = ctx.parsedBody;
+    const { ticket, resource, subject, iss } = ctx.parsedBody;
     
+    if (iss) {
+      try {
+        new URL(iss);
+      } catch (e) {
+        this.logger.debug(_scope, 'unparsable issuer', { ticket, resource, subject, iss, ctx });
+        // continue, will try resource for metadata
+      }
+    }
     try {
       new URL(resource);
     } catch (e) {
@@ -1527,7 +1542,7 @@ class Manager {
       }
 
       try {
-        const result = await this.queuePublisher.publish(queueName, { ticket, resource, subject, epochMs: Date.now() });
+        const result = await this.queuePublisher.publish(queueName, { ticket, resource, subject, iss, epochMs: Date.now() });
         this.logger.debug(_scope, 'accepted ticket offer', { queueName, ticket, resource, subject, ctx, result });
       } catch (e) {
         this.logger.error(_scope, 'failed to publish ticket to queue', { error: e, queueName, ticket, resource, subject, ctx });
@@ -1541,6 +1556,104 @@ class Manager {
   }
 
 
+  /**
+   * Process messages from proffered ticket queue.
+   * Attempt to redeem ticket and publish to redeemed token queue.
+   * @param {AMQPChannel} channel
+   * @param {Buffer} message
+   */
+  async queuedTicketProcessor(channel, message) {
+    const _scope = _fileScope('queuedTicketProcessor');
+
+    const queueName = this.options.queues.ticketRedeemedName;
+    let payload, ticket, resource, subject, iss;
+    try {
+      payload = message.content.toString();
+
+      this.logger.debug(_scope, 'processing ticket', { payload });
+      ({
+        ticket,
+        resource,
+        subject,
+        iss,
+      } = JSON.parse(payload));
+    } catch (e) {
+      this.logger.error(_scope, 'could not parse message, discarding', { error: e, message });
+      channel.ack(message);
+      return;
+    }
+
+    let issuerUrlObj;
+    try {
+      if (iss) {
+        issuerUrlObj = new URL(iss);
+      }
+    } catch (e) {
+      this.logger.debug(_scope, 'unparsable issuer, falling back to resource discovery', { error: e, payload });
+    }
+
+    let resourceUrlObj;
+    try {
+      resourceUrlObj = new URL(resource);
+    } catch (e) {
+      this.logger.error(_scope, 'unparsable resource, discarding', { payload });
+      channel.ack(message);
+      return;
+    }
+
+    let isNotRetryable = false;
+    try {
+      await this.db.context(async (dbCtx) => {
+
+        let token;
+        try {
+          token = await this.communication.redeemTicket(ticket, resourceUrlObj, issuerUrlObj);
+          isNotRetryable = true; // assume we cannot redeem a ticket more than once
+          this.logger.debug(_scope, 'successfully redeemed ticket', { token, payload });
+          channel.ack(message);
+        } catch (e) {
+          this.logger.error(_scope, 'failed to redeem ticket', { error: e, payload });
+          throw e;
+        }
+
+        // persist our redemption
+        const redeemedData = {
+          subject,
+          resource,
+          iss,
+          ticket,
+          token,
+        };
+        await this.db.ticketRedeemed(dbCtx, redeemedData);
+
+        try {
+          const result = await this.queuePublisher.publish(queueName, redeemedData);
+          this.logger.info(_scope, 'published ticket token', { queueName, ticket, resource, subject, iss, result });
+        } catch (e) {
+          this.logger.error(_scope, 'failed to publish token to queue', { error: e, queueName, ticket, token, resource, subject });
+          throw e; // return a 500
+        }
+
+        await this.db.ticketTokenPublished(dbCtx, redeemedData);
+
+      }); // dbCtx
+
+    } catch (e) {
+      isNotRetryable = isNotRetryable
+        || (e instanceof CommunicationErrors.ValidationError)
+        || (e?.response?.statusCode < 500)
+      ;
+      if (isNotRetryable) {
+        this.logger.error(_scope, 'failed to process ticket, not requeuing', { error: e, payload });
+        channel.ack(message);
+        return;
+      }
+      this.logger.error(_scope, 'failed to process ticket, requeuing', { error: e, payload });
+      throw e;
+    }
+  }
+
+
   /**
    * Validate a token and return data about it.
    * @param {http.ServerResponse} res
@@ -1933,9 +2046,10 @@ class Manager {
             this.logger.debug(_scope, 'ticket created', { ctx, ticketData, subjectData });
 
             try {
-              const result = await this.communication.deliverTicket(ctx.ticketEndpointUrl, ctx.ticketResourceUrl, ctx.ticketSubjectUrl, ticket);
+              const issuerUrl = new URL(this.options.dingus.selfBaseUrl);
+              const result = await this.communication.deliverTicket(ctx.ticketEndpointUrl, ctx.ticketResourceUrl, ctx.ticketSubjectUrl, ticket, issuerUrl);
               ctx.notifications.push(`Success! Ticket was delivered. (${result?.statusMessage})`);
-              this.logger.info(_scope, 'ticket delivered', { ctx, result });
+              this.logger.info(_scope, 'ticket delivered', { ctx });
             } catch (e) {
               this.logger.error(_scope, 'failed to deliver ticket', { ctx, error: e });
               ctx.errors.push(`Failed to deliver ticket. (${e})`);
@@ -2017,4 +2131,4 @@ class Manager {
 
 }
 
-module.exports = Manager;
\ No newline at end of file
+module.exports = Manager;