Merge branch 'v1.3-dev'
[websub-hub] / src / manager.js
index 6fb187cde3dc0da5854727ffbc9297199a752899..a7f9f55b49cc9cbd4e7db76fac5d0c0b6c23889c 100644 (file)
@@ -25,20 +25,16 @@ class Manager {
     this.db = db;
     this.options = options;
     this.communication = new Communication(logger, db, options);
     this.db = db;
     this.options = options;
     this.communication = new Communication(logger, db, options);
-
-    // Precalculate the invariant root GET metadata.
-    this.getRootContent = Template.rootHTML(undefined, options);
-    const now = new Date();
-    this.startTimeString = now.toGMTString();
-    this.startTimeMs = now.getTime();
-    this.getRootETag = common.generateETag(undefined, undefined, this.getRootContent);
   }
 
   }
 
+  /**
+   * @typedef {import('node:http')} http
+   */
 
   /**
    * GET request for healthcheck.
 
   /**
    * GET request for healthcheck.
-   * @param {http.ServerResponse} res
-   * @param {object} ctx
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
    */
   async getHealthcheck(res, ctx) {
     const _scope = _fileScope('getHealthcheck');
    */
   async getHealthcheck(res, ctx) {
     const _scope = _fileScope('getHealthcheck');
@@ -53,46 +49,40 @@ class Manager {
 
   /**
    * GET request for root.
 
   /**
    * GET request for root.
-   * @param {http.ServerResponse} res
-   * @param {object} ctx
+   * @param {http.ClientRequest} req request
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
    */
   async getRoot(req, res, ctx) {
     const _scope = _fileScope('getRoot');
     this.logger.debug(_scope, 'called', { ctx });
 
    */
   async getRoot(req, res, ctx) {
     const _scope = _fileScope('getRoot');
     this.logger.debug(_scope, 'called', { ctx });
 
-    res.setHeader(Enum.Header.LastModified, this.startTimeString);
-    res.setHeader(Enum.Header.ETag, this.getRootETag);
-
-    if (common.isClientCached(req, this.startTimeMs, this.getRootETag)) {
-      this.logger.debug(_scope, 'client cached response', { ctx });
-      res.statusCode = 304;
-      res.end();
-      return;
-    }
-    res.end(this.getRootContent);
+    const content = Template.rootHTML(ctx, this.options);
+    res.end(content);
     this.logger.info(_scope, 'finished', { ctx });
   }
 
 
     this.logger.info(_scope, 'finished', { ctx });
   }
 
 
-  /** All the fields the root handler deals with.
+  /**
+   * All the fields the root handler deals with.
    * @typedef {object} RootData
    * @typedef {object} RootData
-   * @property {string} callback url
-   * @property {string} mode
-   * @property {string} topic
-   * @property {number} topicId
-   * @property {string} leaseSeconds
-   * @property {string} secret
-   * @property {string} httpRemoteAddr
-   * @property {string} httpFrom
-   * @property {boolean} isSecure
-   * @property {boolean} isPublisherValidated
+   * @property {string} callback url
+   * @property {string} mode mode
+   * @property {string} topic topic
+   * @property {number} topicId topic id
+   * @property {string} leaseSeconds lease seconds
+   * @property {string} secret secret
+   * @property {string} httpRemoteAddr remote address
+   * @property {string} httpFrom from
+   * @property {boolean} isSecure is secure
+   * @property {boolean} isPublisherValidated is published validated
    */
 
   /**
    * Extract api parameters.
    */
 
   /**
    * Extract api parameters.
-   * @param {http.ClientRequest} req 
-   * @param {Object} ctx
-   * @returns {RootData}
+   * @param {http.ClientRequest} req request
+   * @param {object} ctx context
+   * @returns {RootData} root data
    */
   static _getRootData(req, ctx) {
     const postData = ctx.parsedBody;
    */
   static _getRootData(req, ctx) {
     const postData = ctx.parsedBody;
@@ -114,11 +104,12 @@ class Manager {
 
   /**
    * 
 
   /**
    * 
-   * @param {*} dbCtx
-   * @param {RootData} data
-   * @param {String[]} warn
-   * @param {String[]} err
-   * @param {String} requestId
+   * @param {*} dbCtx db context
+   * @param {RootData} data root data
+   * @param {string[]} warn warnings
+   * @param {string[]} err errors
+   * @param {string} requestId request id
+   * @returns {Promise<void>}
    */
   async _validateRootData(dbCtx, data, warn, err, requestId) {
     // These checks can modify data, so order matters.
    */
   async _validateRootData(dbCtx, data, warn, err, requestId) {
     // These checks can modify data, so order matters.
@@ -132,10 +123,12 @@ class Manager {
    * Check that requested topic exists and values are in range.
    * Sets topic id, publisher validation state, and requested lease
    * seconds on data.
    * Check that requested topic exists and values are in range.
    * Sets topic id, publisher validation state, and requested lease
    * seconds on data.
-   * @param {*} dbCtx
-   * @param {RootData} data
-   * @param {String[]} warn
-   * @param {String[]} err
+   * @param {*} dbCtx db context
+   * @param {RootData} data root data
+   * @param {string[]} warn warnings
+   * @param {string[]} err errors
+   * @param {string} requestId request id
+   * @returns {Promise<void>}
    */
   async _checkTopic(dbCtx, data, warn, err, requestId) {
     const _scope = _fileScope('_checkTopic');
    */
   async _checkTopic(dbCtx, data, warn, err, requestId) {
     const _scope = _fileScope('_checkTopic');
@@ -144,12 +137,12 @@ class Manager {
     if (data.topic) {
       topic = await this.db.topicGetByUrl(dbCtx, data.topic);
 
     if (data.topic) {
       topic = await this.db.topicGetByUrl(dbCtx, data.topic);
 
-      if (!topic && this.options.manager.publicHub) {
+      if (!topic && this._newTopicCreationAllowed()) {
         this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
 
         try {
           new URL(data.topic);
         this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
 
         try {
           new URL(data.topic);
-        } catch (e) {
+        } catch (e) { // eslint-disable-line no-unused-vars
           err.push('invalid topic url (failed to parse url)');
           return;
         }
           err.push('invalid topic url (failed to parse url)');
           return;
         }
@@ -170,14 +163,12 @@ class Manager {
 
     if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
       data.leaseSeconds = topic.leaseSecondsPreferred;
 
     if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
       data.leaseSeconds = topic.leaseSecondsPreferred;
-    } else {
-      if (data.leaseSeconds > topic.leaseSecondsMax) {
-        data.leaseSeconds = topic.leaseSecondsMax;
-        warn.push(`requested lease too long, using ${data.leaseSeconds}`);
-      } else if (data.leaseSeconds < topic.leaseSecondsMin) {
-        data.leaseSeconds = topic.leaseSecondsMin;
-        warn.push(`requested lease too short, using ${data.leaseSeconds}`);
-      }
+    } else if (data.leaseSeconds > topic.leaseSecondsMax) {
+      data.leaseSeconds = topic.leaseSecondsMax;
+      warn.push(`requested lease too long, using ${data.leaseSeconds}`);
+    } else if (data.leaseSeconds < topic.leaseSecondsMin) {
+      data.leaseSeconds = topic.leaseSecondsMin;
+      warn.push(`requested lease too short, using ${data.leaseSeconds}`);
     }
 
     if (topic.publisherValidationUrl) {
     }
 
     if (topic.publisherValidationUrl) {
@@ -188,9 +179,9 @@ class Manager {
 
   /**
    * Check data for valid callback url and scheme constraints.
 
   /**
    * Check data for valid callback url and scheme constraints.
-   * @param {RootData} data
-   * @param {String[]} warn
-   * @param {String[]} err
+   * @param {RootData} data root data
+   * @param {string[]} warn warnings
+   * @param {string[]} err errors
    */
   _checkCallbackAndSecrets(data, warn, err) {
     let isCallbackSecure = false;
    */
   _checkCallbackAndSecrets(data, warn, err) {
     let isCallbackSecure = false;
@@ -201,7 +192,7 @@ class Manager {
       try {
         const c = new URL(data.callback);
         isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
       try {
         const c = new URL(data.callback);
         isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
-      } catch (e) {
+      } catch (e) { // eslint-disable-line no-unused-vars
         err.push('invalid callback url (failed to parse url');
         return;
       }
         err.push('invalid callback url (failed to parse url');
         return;
       }
@@ -228,11 +219,11 @@ class Manager {
   /**
    * Check mode validity and subscription requirements.
    * Publish mode is handled elsewhere in the flow.
   /**
    * Check mode validity and subscription requirements.
    * Publish mode is handled elsewhere in the flow.
-   * @param {*} dbCtx
-   * @param {RootData} data
-   * @param {String[]} warn
-   * @param {String[]} err
-   * @param {String} requestId
+   * @param {*} dbCtx db context
+   * @param {RootData} data root data
+   * @param {string[]} warn warnings
+   * @param {string[]} err errors
+   * @returns {Promise<void>}
    */
   async _checkMode(dbCtx, data, warn, err) {
     switch (data.mode) {
    */
   async _checkMode(dbCtx, data, warn, err) {
     switch (data.mode) {
@@ -247,11 +238,10 @@ class Manager {
         }
         if (s === undefined) {
           err.push('not subscribed');
         }
         if (s === undefined) {
           err.push('not subscribed');
-        } else {
-          if (s.expires < currentEpoch) {
-            err.push('subscription already expired');
-          }
+        } else if (s.expires < currentEpoch) {
+          err.push('subscription already expired');
         }
         }
+        
         break;
       }
 
         break;
       }
 
@@ -263,51 +253,174 @@ class Manager {
 
 
   /**
 
 
   /**
-   * Check that a publish request topic is valid and exists,
-   * and if it is, add topicId to data.
-   * For a public publish request, create topic if not exists.
-   * @param {*} dbCtx
-   * @param {RootData} data
-   * @param {String[]} warn
-   * @param {String[]} err
-   * @param {String} requestId
+   * Determine if a topic url is allowed to be created.
+   * In the future, this may be more complicated.
+   * @returns {boolean} is public hub
+   */
+  _newTopicCreationAllowed() {
+    return this.options.manager.publicHub;
+  }
+
+
+  /**
+   * Check that a publish request's topic(s) are valid and exist,
+   * returning an array with the results for each.
+   * For a public-hub publish request, creates topics if they do not exist.
+   * @param {*} dbCtx db context
+   * @param {RootData} data root data
+   * @param {string} requestId request id
+   * @returns {Promise<object[]>} results
    */
    */
-  async _checkPublish(dbCtx, data, warn, err, requestId) {
+  async _publishTopics(dbCtx, data, requestId) {
     const _scope = _fileScope('_checkPublish');
 
     const _scope = _fileScope('_checkPublish');
 
-    const publishUrl = data.url || data.topic;
+    // Publish requests may include multiple topics, consider them all, but deduplicate.
+    const publishUrls = Array.from(new Set([
+      ...common.ensureArray(data.url),
+      ...common.ensureArray(data.topic),
+    ]));
+
+    // Map the requested topics to their ids, creating if necessary.
+    return Promise.all(publishUrls.map(async (url) => {
+      const result = {
+        url,
+        warn: [],
+        err: [],
+        topicId: undefined,
+      };
+      let topic = await this.db.topicGetByUrl(dbCtx, url);
+      if (!topic && this._newTopicCreationAllowed()) {
+        try {
+          new URL(url);
+        } catch (e) { // eslint-disable-line no-unused-vars
+          result.err.push('invalid topic url (failed to parse url)');
+          return result;
+        }
+        await this.db.topicSet(dbCtx, {
+          // TODO: accept a publisherValidationUrl parameter
+          url,
+        });
+        topic = await this.db.topicGetByUrl(dbCtx, url);
+        this.logger.info(_scope, 'new topic from publish request', { url, requestId });
+      }
+      if (!topic || topic.isDeleted) {
+        result.err.push('topic not supported');
+        return result;
+      }
+      result.topicId = topic.id;
+      return result;
+    }));
+  }
 
 
-    let topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
-    if (!topic && this.options.manager.publicHub) {
-      this.logger.info(_scope, 'new topic from publish request', { data, requestId });
 
 
-      try {
-        new URL(publishUrl);
-      } catch (e) {
-        err.push('invalid topic url (failed to parse url)');
-        return;
+  /**
+   * Render response for multi-topic publish requests.
+   * @param {object} ctx context
+   * @param {object[]} publishTopics topics
+   * @returns {string} response content
+   */
+  static multiPublishContent(ctx, publishTopics) {
+    const responses = publishTopics.map((topic) => ({
+      href: topic.url,
+      status: topic.status,
+      statusMessage: topic.statusMessage,
+      errors: topic.err,
+      warnings: topic.warn,
+    }));
+    switch (ctx.responseType) {
+      case Enum.ContentType.ApplicationJson:
+        return JSON.stringify(responses);
+
+      case Enum.ContentType.TextPlain:
+      default: {
+        const textResponses = responses.map((response) => {
+          const details = Manager._prettyDetails(response.errors, response.warnings);
+          const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
+          return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
+        });
+        return textResponses.join('\n----\n');
       }
       }
+    }
+  }
 
 
-      await this.db.topicSet(dbCtx, {
-        url: publishUrl,
-      });
-      topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
+
+  /**
+   * Process a publish request.
+   * @param {*} dbCtx db context
+   * @param {object} data data
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
+   */
+  async _publishRequest(dbCtx, data, res, ctx) {
+    const _scope = _fileScope('_parsePublish');
+    this.logger.debug(_scope, 'called', { data });
+
+    const requestId = ctx.requestId;
+
+    // Parse and validate all the topics in the request.
+    data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
+    if (!data?.publishTopics?.length) {
+      const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
+      throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
     }
 
     }
 
-    if (!topic || topic.isDeleted) {
-      err.push('not a supported topic');
-      return;
+    // Set status per topic
+    for (const topicResult of data.publishTopics) {
+      topicResult.status = topicResult.err.length ? 400 : 202;
+      topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
     }
 
     }
 
-    data.topicId = topic.id;
+    // Process the valid publish notifications
+    const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
+    try {
+      await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
+    } catch (e) {
+      this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
+      throw e;
+    }
+
+    this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
+
+    if (data.publishTopics.length === 1) {
+      const soleTopic = data.publishTopics[0];
+      res.statusCode = soleTopic.status;
+      res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
+    } else {
+      res.statusCode = 207;
+      res.end(Manager.multiPublishContent(ctx, data.publishTopics));
+    }
+
+    if (this.options.manager.processImmediately
+    &&  validPublishTopics.length) {
+      try {
+        await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
+      } catch (e) { // eslint-disable-line no-unused-vars
+        this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
+        // Don't bother re-throwing, as we've already ended this response.
+      }
+    }
+  }
+
+
+  /**
+   * Annotate any encountered issues.
+   * @param {string[]} err errors
+   * @param {string[]} warn warnings
+   * @returns {string[]} rendered list of errors and warnings
+   */
+  static _prettyDetails(err, warn) {
+    return [
+      ...err.map((entry) => `error: ${entry}`),
+      ...warn.map((entry) => `warning: ${entry}`),
+    ];
   }
 
 
   /**
    * POST request for root.
   }
 
 
   /**
    * POST request for root.
-   * @param {http.ClientRequest} req
-   * @param {http.ServerResponse} res
-   * @param {object} ctx
+   * @param {http.ClientRequest} req request
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
    */
   async postRoot(req, res, ctx) {
     const _scope = _fileScope('postRoot');
    */
   async postRoot(req, res, ctx) {
     const _scope = _fileScope('postRoot');
@@ -322,15 +435,14 @@ class Manager {
 
     await this.db.context(async (dbCtx) => {
 
 
     await this.db.context(async (dbCtx) => {
 
+      // Handle publish requests elsewhere
       if (data.mode === Enum.Mode.Publish) {
       if (data.mode === Enum.Mode.Publish) {
-        await this._checkPublish(dbCtx, data, warn, err, requestId);
-      } else {
-        await this._validateRootData(dbCtx, data, warn, err, requestId);
+        return this._publishRequest(dbCtx, data, res, ctx);
       }
 
       }
 
-      const prettyErr = err.map((entry) => `error: ${entry}`);
-      const prettyWarn = warn.map((entry) => `warning: ${entry}`);
-      const details = prettyErr.concat(prettyWarn);
+      await this._validateRootData(dbCtx, data, warn, err, requestId);
+
+      const details = Manager._prettyDetails(err, warn);
 
       // Any errors are fatal.  Stop and report anything that went wrong.
       if (err.length) {
 
       // Any errors are fatal.  Stop and report anything that went wrong.
       if (err.length) {
@@ -339,18 +451,11 @@ class Manager {
       }
 
       // Commit the request for later processing.
       }
 
       // Commit the request for later processing.
-      let fn, info, id;
+      let id;
       try {
       try {
-        if (data.mode === Enum.Mode.Publish) {
-          fn = 'topicPublish';
-          info = await this.db.topicFetchRequested(dbCtx, data.topicId);
-          id = data.topicId;
-        } else {
-          fn = 'verificationInsert';
-          id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
-        }
+        id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
       } catch (e) {
       } catch (e) {
-        this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId });
+        this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
         throw e;
       }
 
         throw e;
       }
 
@@ -362,15 +467,9 @@ class Manager {
       if (this.options.manager.processImmediately
       &&  id) {
         try {
       if (this.options.manager.processImmediately
       &&  id) {
         try {
-          if (data.mode === Enum.Mode.Publish) {
-            fn = 'topicFetchClaimAndProcessById';
-            await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId);
-          } else {
-            fn = 'processVerification';
-            await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
-          }
-        } catch (e) {
-          this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId });
+          await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
+        } catch (e) { // eslint-disable-line no-unused-vars
+          this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
           // Don't bother re-throwing, as we've already ended this response.
         }
       }
           // Don't bother re-throwing, as we've already ended this response.
         }
       }
@@ -380,15 +479,15 @@ class Manager {
 
   /**
    * Render topic info content.
 
   /**
    * Render topic info content.
-   * @param {Object} ctx
-   * @param {String} ctx.responseType
-   * @param {String} ctx.topicUrl
-   * @param {Number} ctx.count
-   * @returns {String}
+   * @param {object} ctx context
+   * @param {string} ctx.responseType response type
+   * @param {string} ctx.topicUrl topic url
+   * @param {number} ctx.count count of subscribers
+   * @returns {string} response content
    */
   // eslint-disable-next-line class-methods-use-this
   infoContent(ctx) {
    */
   // eslint-disable-next-line class-methods-use-this
   infoContent(ctx) {
-    // eslint-disable-next-line sonarjs/no-small-switch
+     
     switch (ctx.responseType) {
       case Enum.ContentType.ApplicationJson:
         return JSON.stringify({
     switch (ctx.responseType) {
       case Enum.ContentType.ApplicationJson:
         return JSON.stringify({
@@ -407,8 +506,8 @@ class Manager {
 
   /**
    * GET request for /info?topic=url&format=type
 
   /**
    * GET request for /info?topic=url&format=type
-   * @param {http.ServerResponse} res
-   * @param {object} ctx
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
    */
   async getInfo(res, ctx) {
     const _scope = _fileScope('getInfo');
    */
   async getInfo(res, ctx) {
     const _scope = _fileScope('getInfo');
@@ -436,7 +535,7 @@ class Manager {
 
     try {
       new URL(ctx.topicUrl);
 
     try {
       new URL(ctx.topicUrl);
-    } catch (e) {
+    } catch (e) { // eslint-disable-line no-unused-vars
       throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
     }
 
       throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
     }
 
@@ -449,15 +548,88 @@ class Manager {
       ctx.count = count.count;
     });
 
       ctx.count = count.count;
     });
 
-    res.end(this.infoContent(ctx));
-    this.logger.info(_scope, 'finished', { ...ctx });
+    const content = this.infoContent(ctx);
+    res.setHeader(Enum.Header.ETag, common.generateETag(undefined, undefined, content));
+    res.setHeader(Enum.Header.CacheControl, 'no-cache');
+    res.end(content);
+    this.logger.info(_scope, 'finished', { ctx });
+  }
+
+
+  /**
+   * label the bars of the topic update history graph
+   * @param {number} index index
+   * @param {number} value value
+   * @returns {string} caption
+   */
+  static _historyBarCaption(index, value) {
+    let when;
+    switch (index) {
+      case 0:
+        when ='today';
+        break;
+      case 1:
+        when = 'yesterday';
+        break;
+      default:
+        when = `${index} days ago`;
+    }
+    return `${when}, ${value || 'no'} update${value === 1 ? '': 's'}`;
+  }
+
+
+  /**
+   * GET SVG chart of topic update history
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
+   */
+  async getHistorySVG(res, ctx) {
+    const _scope = _fileScope('getHistorySVG');
+    this.logger.debug(_scope, 'called', { ctx });
+
+    const days = Math.min(parseInt(ctx.queryParams.days) || this.options.manager.publishHistoryDays, 365);
+    const histOptions = {
+      title: 'Topic Publish History',
+      description: 'Updates per Day',
+      labelZero: '^ Today',
+      labelX: 'Days Ago',
+      maxItems: days,
+      minItems: days,
+      tickEvery: 7,
+      barWidth: 25,
+      barHeight: 40,
+      labelHeight: 12,
+      barCaptionFn: Manager._historyBarCaption,
+    };
+
+    let publishHistory;
+    await this.db.context(async (dbCtx) => {
+      publishHistory = await this.db.topicPublishHistory(dbCtx, ctx.params.topicId, days);
+    });
+
+    res.end(Template.histogramSVG(publishHistory, histOptions));
+    this.logger.info(_scope, 'finished', { ctx });
+  }
+
+
+  /**
+   * Determine if a profile url matches enough of a topic url to describe control over it.
+   * Topic must match hostname and start with the profile's path.
+   * @param {URL} profileUrlObj profile url
+   * @param {URL} topicUrlObj topic url
+   * @returns {boolean} profile is super-url of topic
+   */
+  static _profileControlsTopic(profileUrlObj, topicUrlObj) {
+    const hostnameMatches = profileUrlObj.hostname === topicUrlObj.hostname;
+    const pathIsPrefix = topicUrlObj.pathname.startsWith(profileUrlObj.pathname);
+    return hostnameMatches && pathIsPrefix;
   }
 
 
   /**
    * GET request for authorized /admin information.
   }
 
 
   /**
    * GET request for authorized /admin information.
-   * @param {http.ServerResponse} res
-   * @param {object} ctx
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
    */
   async getAdminOverview(res, ctx) {
     const _scope = _fileScope('getAdminOverview');
    */
   async getAdminOverview(res, ctx) {
     const _scope = _fileScope('getAdminOverview');
@@ -468,36 +640,62 @@ class Manager {
     });
     this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
 
     });
     this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
 
+    // Profile users can only see related topics.
+    if (ctx?.session?.authenticatedProfile) {
+      const profileUrlObj = new URL(ctx.session.authenticatedProfile);
+      ctx.topics = ctx.topics.filter((topic) => {
+        const topicUrlObj = new URL(topic.url);
+        return Manager._profileControlsTopic(profileUrlObj, topicUrlObj);
+      });
+    }
+
     res.end(Template.adminOverviewHTML(ctx, this.options));
     res.end(Template.adminOverviewHTML(ctx, this.options));
-    this.logger.info(_scope, 'finished', { ...ctx, topics: ctx.topics.length })
+    this.logger.info(_scope, 'finished', { ctx, topics: ctx.topics.length });
   }
 
 
   /**
    * GET request for authorized /admin/topic/:topicId information.
   }
 
 
   /**
    * GET request for authorized /admin/topic/:topicId information.
-   * @param {http.ServerResponse} res
-   * @param {object} ctx
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
    */
   async getTopicDetails(res, ctx) {
     const _scope = _fileScope('getTopicDetails');
     this.logger.debug(_scope, 'called', { ctx });
 
    */
   async getTopicDetails(res, ctx) {
     const _scope = _fileScope('getTopicDetails');
     this.logger.debug(_scope, 'called', { ctx });
 
+    ctx.publishSpan = 60; // FIXME: configurable
     const topicId = ctx.params.topicId;
     const topicId = ctx.params.topicId;
+    let publishHistory;
     await this.db.context(async (dbCtx) => {
       ctx.topic = await this.db.topicGetById(dbCtx, topicId);
       ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
     await this.db.context(async (dbCtx) => {
       ctx.topic = await this.db.topicGetById(dbCtx, topicId);
       ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
+      publishHistory = await this.db.topicPublishHistory(dbCtx, topicId, ctx.publishSpan);
     });
     });
-    this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions });
+    ctx.publishCount = publishHistory.reduce((a, b) => a + b, 0);
+    ctx.subscriptionsDelivered = ctx.subscriptions.filter((subscription) => {
+      return subscription.latestContentDelivered >= ctx.topic.contentUpdated;
+    }).length;
+    this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions, updates: ctx.publishCount });
+
+    // Profile users can only see related topics.
+    if (ctx?.session?.authenticatedProfile) {
+      const profileUrlObj = new URL(ctx.session.authenticatedProfile);
+      const topicUrlObj = new URL(ctx.topic.url);
+      if (!Manager._profileControlsTopic(profileUrlObj, topicUrlObj)) {
+        ctx.topic = null;
+        ctx.subscriptions = [];
+      }
+    }
 
     res.end(Template.adminTopicDetailsHTML(ctx, this.options));
 
     res.end(Template.adminTopicDetailsHTML(ctx, this.options));
-    this.logger.info(_scope, 'finished', { ...ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic.id });
+    this.logger.info(_scope, 'finished', { ctx, subscriptions: ctx.subscriptions.length, topic: ctx?.topic?.id || ctx.topic });
   }
 
 
   /**
    * PATCH and DELETE for updating topic data.
   }
 
 
   /**
    * PATCH and DELETE for updating topic data.
-   * @param {http.ServerResponse} res
-   * @param {Object} ctx
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
    */
   async updateTopic(res, ctx) {
     const _scope = _fileScope('updateTopic');
    */
   async updateTopic(res, ctx) {
     const _scope = _fileScope('updateTopic');
@@ -518,6 +716,8 @@ class Manager {
           await this.db.topicDeleted(txCtx, topicId);
           res.end();
           this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
           await this.db.topicDeleted(txCtx, topicId);
           res.end();
           this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
+          // Attempt to remove from db if no active subscriptions.
+          await this.db.topicPendingDelete(txCtx, topicId);
           return;
         }
 
           return;
         }
 
@@ -578,8 +778,8 @@ class Manager {
 
   /**
    * PATCH and DELETE for updating subscription data.
 
   /**
    * PATCH and DELETE for updating subscription data.
-   * @param {http.ServerResponse} res
-   * @param {Object} ctx
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
    */
   async updateSubscription(res, ctx) {
     const _scope = _fileScope('updateSubscription');
    */
   async updateSubscription(res, ctx) {
     const _scope = _fileScope('updateSubscription');
@@ -658,15 +858,17 @@ class Manager {
 
   /**
    * POST request for manually running worker.
 
   /**
    * POST request for manually running worker.
-   * @param {http.ServerResponse} res
-   * @param {object} ctx
+   * @param {http.ServerResponse} res response
+   * @param {object} ctx context
    */
   async processTasks(res, ctx) {
    */
   async processTasks(res, ctx) {
-    const _scope = _fileScope('getTopicDetails');
+    const _scope = _fileScope('processTasks');
     this.logger.debug(_scope, 'called', { ctx });
 
     // N.B. no await on this
     this.logger.debug(_scope, 'called', { ctx });
 
     // N.B. no await on this
-    this.communication.worker.process();
+    this.communication.worker.process().catch((e) => {
+      this.logger.error(_scope, 'failed', { error: e, ctx });
+    });
 
     res.end();
     this.logger.info(_scope, 'invoked worker process', { ctx });
 
     res.end();
     this.logger.info(_scope, 'invoked worker process', { ctx });