Merge branch 'v1.2-dev' as v1.2.0 v1.2.0
authorJustin Wind <justin.wind+git@gmail.com>
Sat, 28 Aug 2021 17:04:08 +0000 (10:04 -0700)
committerJustin Wind <justin.wind+git@gmail.com>
Sat, 28 Aug 2021 17:04:08 +0000 (10:04 -0700)
20 files changed:
CHANGELOG.md
package-lock.json
package.json
src/common.js
src/communication.js
src/db/base.js
src/db/postgres/index.js
src/db/postgres/sql/subscription-delete-expired.sql [new file with mode: 0644]
src/db/postgres/sql/topic-delete-by-id.sql [new file with mode: 0644]
src/db/sqlite/index.js
src/db/sqlite/sql/subscription-delete-expired.sql [new file with mode: 0644]
src/db/sqlite/sql/topic-delete-by-id.sql [new file with mode: 0644]
src/manager.js
test/src/common.js
test/src/communication.js
test/src/db/integration.js
test/src/db/postgres.js
test/src/db/sqlite.js
test/src/manager.js
test/stub-db.js

index 966dede5324bae5c60a68b606f7e693c20367e9f..1b458a4ba7c1ed104bc75dcfe999da2f42eea02d 100644 (file)
@@ -4,6 +4,14 @@ Releases and notable changes to this project are documented here.
 
 ## [Unreleased]
 
+## [v1.2.0] - 2021-08-28
+
+### Added
+
+- Accept multiple topics in publish requests.
+- Expired subscription entries are removed from the database when their topics are updated.
+- Topics which have been marked deleted are removed from the database after all subscribers have been notified.
+
 ## [v1.1.5] - 2021-08-23
 
 ### Fixed
@@ -59,7 +67,8 @@ Releases and notable changes to this project are documented here.
 
 ---
 
-[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.1.5
+[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.2.0
+[v1.2.0]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.2.0;hp=v1.1.5
 [v1.1.5]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.5;hp=v1.1.4
 [v1.1.4]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.4;hp=v1.1.3
 [v1.1.3]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.3;hp=v1.1.2
index c0aa159da7addbada1820daa688f02d547e167fd..002506d48522ea5e05dcb00b4b7d4b7e0534191a 100644 (file)
@@ -1,6 +1,6 @@
 {
   "name": "websub-hub",
-  "version": "1.1.5",
+  "version": "1.2.0",
   "lockfileVersion": 1,
   "requires": true,
   "dependencies": {
         "domelementtype": "^2.0.1",
         "domhandler": "^4.2.0",
         "entities": "^2.0.0"
+      },
+      "dependencies": {
+        "entities": {
+          "version": "2.2.0",
+          "resolved": "https://registry.npmjs.org/entities/-/entities-2.2.0.tgz",
+          "integrity": "sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A=="
+        }
       }
     },
     "domelementtype": {
       }
     },
     "entities": {
-      "version": "2.2.0",
-      "resolved": "https://registry.npmjs.org/entities/-/entities-2.2.0.tgz",
-      "integrity": "sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A=="
+      "version": "3.0.1",
+      "resolved": "https://registry.npmjs.org/entities/-/entities-3.0.1.tgz",
+      "integrity": "sha512-WiyBqoomrwMdFG1e0kqvASYfnlb0lp8M5o5Fw2OFq1hNZxxcNk8Ik0Xm7LxzBhuidnZB/UtBqVCgUz3kBOP51Q=="
     },
     "es6-error": {
       "version": "4.1.1",
       "dev": true
     },
     "htmlparser2": {
-      "version": "6.1.0",
-      "resolved": "https://registry.npmjs.org/htmlparser2/-/htmlparser2-6.1.0.tgz",
-      "integrity": "sha512-gyyPk6rgonLFEDGoeRgQNaEUvdJ4ktTmmUh/h2t7s+M8oPpIPxgNACWa+6ESR57kXstwqPiCut0V8NRpcwgU7A==",
+      "version": "7.0.0",
+      "resolved": "https://registry.npmjs.org/htmlparser2/-/htmlparser2-7.0.0.tgz",
+      "integrity": "sha512-IhdltX9BWhYQft4UPA92jFasNajskja0om6vU0DaIEL4OseCg5zE+mHAMr51AT89TbzzECrQWJ4CZ5NVYTPlKw==",
       "requires": {
         "domelementtype": "^2.0.1",
         "domhandler": "^4.0.0",
         "domutils": "^2.5.2",
-        "entities": "^2.0.0"
+        "entities": "^3.0.1"
       }
     },
     "https-proxy-agent": {
       "integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A=="
     },
     "mocha": {
-      "version": "9.0.3",
-      "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.0.3.tgz",
-      "integrity": "sha512-hnYFrSefHxYS2XFGtN01x8un0EwNu2bzKvhpRFhgoybIvMaOkkL60IVPmkb5h6XDmUl4IMSB+rT5cIO4/4bJgg==",
+      "version": "9.1.0",
+      "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.1.0.tgz",
+      "integrity": "sha512-Kjg/XxYOFFUi0h/FwMOeb6RoroiZ+P1yOfya6NK7h3dNhahrJx1r2XIT3ge4ZQvJM86mdjNA+W5phqRQh7DwCg==",
       "dev": true,
       "requires": {
         "@ungap/promise-all-settled": "1.1.2",
index f6971cd517963ac45d82d4e9c9105a5adb6da013..a16ad0e3a4997e3f76590c21ea11cc3e66e12c2a 100644 (file)
@@ -1,6 +1,6 @@
 {
   "name": "websub-hub",
-  "version": "1.1.5",
+  "version": "1.2.0",
   "description": "A WebSub Hub server implementation.",
   "main": "server.js",
   "scripts": {
@@ -38,7 +38,7 @@
     "axios": "^0.21.1",
     "better-sqlite3": "^7.4.3",
     "feedparser": "^2.2.10",
-    "htmlparser2": "^6.1.0",
+    "htmlparser2": "^7.0.0",
     "iconv": "^3.0.0",
     "pg-promise": "^10.11.0"
   },
@@ -47,7 +47,7 @@
     "eslint-plugin-node": "^11.1.0",
     "eslint-plugin-security": "^1.4.0",
     "eslint-plugin-sonarjs": "^0.10.0",
-    "mocha": "^9.0.3",
+    "mocha": "^9.1.0",
     "mocha-steps": "^1.3.0",
     "nyc": "^15.1.0",
     "pre-commit": "^1.2.2",
index 0cacc3bf086f2b57f0b10db1ef289c6be7961e15..55a0807b00757e2c2fa0fe8d50e7c22701b0bacf 100644 (file)
@@ -26,6 +26,22 @@ const validHash = (algorithm) => getHashes()
   .filter((h) => h.match(/^sha[0-9]+$/))
   .includes(algorithm);
 
+
+/**
+ * Return an array containing x if x is not an array.
+ * @param {*} x
+ */
+const ensureArray = (x) => {
+  if (x === undefined) {
+    return [];
+  }
+  if (!Array.isArray(x)) {
+    return Array(x);
+  }
+  return x;
+};
+
+
 /**
  * Recursively freeze an object.
  * @param {Object} o 
@@ -140,6 +156,7 @@ module.exports = {
   arrayChunk,
   attemptRetrySeconds,
   axiosResponseLogData,
+  ensureArray,
   freezeDeep,
   logTruncate,
   randomBytesAsync,
index 345714f328951f4f7964249a8884882390bb4abc..097da637bd418d4d655f04aa65651ad5df709e5d 100644 (file)
@@ -223,6 +223,7 @@ class Communication {
     }
 
     if (!topic.isActive) {
+      // These should be filtered out when selecting verification tasks to process.
       this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
       await this.db.verificationRelease(dbCtx, verificationId);
       return;
@@ -328,11 +329,19 @@ class Communication {
         case Enum.Mode.Unsubscribe:
           if (verificationAccepted) {
             await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+            if (topic.isDeleted) {
+              // Remove a deleted topic after the last subscription is notified.
+              await this.db.topicPendingDelete(txCtx, topic.id);
+            }
           }
           break;
 
         case Enum.Mode.Denied:
           await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+          if (topic.isDeleted) {
+            // Remove a deleted topic after he last subscription is notified.
+            await this.db.topicPendingDelete(txCtx, topic.id);
+          }
           break;
 
         default:
@@ -431,6 +440,9 @@ class Communication {
       throw new Errors.InternalInconsistencyError('no such topic id');
     }
 
+    // Cull any expired subscriptions
+    await this.db.subscriptionDeleteExpired(dbCtx, topicId);
+
     logInfoData.url = topicId.url;
 
     if (topic.isDeleted) {
@@ -479,13 +491,15 @@ class Communication {
 
     const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
     if (!validHub) {
-      this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData });
+      this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
       if (this.options.communication.strictTopicHubLink) {
         await this.db.transaction(dbCtx, async (txCtx) => {
           // Set as deleted and set content_updated so subscriptions are notified.
           await this.db.topicDeleted(txCtx, topicId);
           await this.db.topicFetchComplete(txCtx, topicId);
         });
+        // Attempt to remove from db, if no active subscriptions.
+        await this.db.topicPendingDelete(dbCtx, topicId);
         return;
       }
     }
index 95c901068092eb93ea8218cca55e775b1ffa5f10..de0cd44a1b9c5a77eb0e8fcb22771ffbf7c488c1 100644 (file)
@@ -351,6 +351,16 @@ class Database {
   }
 
 
+  /**
+   * Remove any expired subscriptions to a topic.
+   * @param {*} dbCtx
+   * @param {*} topicId
+   */
+  async subscriptionDeleteExpired(dbCtx, topicId) {
+    this._notImplemented('subscriptionDeleteExpired', arguments);
+  }
+
+
   /**
    * Claim subscriptions needing content updates attempted.
    * @param {*} dbCtx 
@@ -533,6 +543,7 @@ class Database {
     this._notImplemented('topicGetAll', arguments);
   }
 
+
   /**
    * Get topic data, without content.
    * @param {*} dbCtx 
@@ -563,14 +574,15 @@ class Database {
     this._notImplemented('topicGetContentById', arguments);
   }
 
-  // /**
-  //  * Call after an unsubscribe, to check if a topic is awaiting deletion, and that
-  //  * was the last subscription belaying it.
-  //  * @param {String|Integer} data topic url or id
-  //  */
-  // async topicPendingDelete(dbCtx, data) {
-  //   this._notImplemented('topicPendingDelete', arguments);
-  // }
+
+  /**
+   * Attempt to delete a topic, which must be set isDeleted, if there
+   * are no more subscriptions belaying its removal.
+   * @param {*} topicId
+   */
+  async topicPendingDelete(dbCtx, topicId) {
+    this._notImplemented('topicPendingDelete', arguments);
+  }
 
 
   /**
index 255909780fe71c41133fc4b29c30eb5ec563f27a..213fa1031c3c44fd29c48a46a8af86ff8bed6217 100644 (file)
@@ -421,6 +421,21 @@ class DatabasePostgres extends Database {
   }
 
 
+  async subscriptionDeleteExpired(dbCtx, topicId) {
+    const _scope = _fileScope('subscriptionDeleteExpired');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
+      this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
     const _scope = _fileScope('subscriptionDeliveryClaim');
     this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
@@ -804,6 +819,37 @@ class DatabasePostgres extends Database {
   }
 
 
+  async topicPendingDelete(dbCtx, topicId) {
+    const _scope = _fileScope('topicPendingDelete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        const topic = await txCtx.one(this.statement.topicGetById, { topicId });
+        if (!topic.isDeleted) {
+          this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+          return;
+        }
+
+        const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
+        if (subscriberCount) {
+          this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+          return;
+        }
+
+        const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
+        if (result.rowCount !== 1) {
+          throw new DBErrors.UnexpectedResult('did not delete topic');
+        }
+      });
+      this.logger.debug(_scope, 'success', { topicId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   async topicSet(dbCtx, data) {
     const _scope = _fileScope('topicSet');
     this.logger.debug(_scope, 'called', data);
diff --git a/src/db/postgres/sql/subscription-delete-expired.sql b/src/db/postgres/sql/subscription-delete-expired.sql
new file mode 100644 (file)
index 0000000..d0f96e7
--- /dev/null
@@ -0,0 +1,4 @@
+--
+DELETE FROM subscription
+WHERE topic_id = $(topicId) AND expires < now()
+
diff --git a/src/db/postgres/sql/topic-delete-by-id.sql b/src/db/postgres/sql/topic-delete-by-id.sql
new file mode 100644 (file)
index 0000000..1378016
--- /dev/null
@@ -0,0 +1,4 @@
+--
+DELETE FROM topic
+WHERE id = $(topicId)
+
index fba4e7ca326f828a6f9698258d2ec4628fb7665e..07d6633311113dcdaf2eae05dabcc912f94fdece 100644 (file)
@@ -359,6 +359,21 @@ class DatabaseSQLite extends Database {
   }
 
 
+  subscriptionDeleteExpired(dbCtx, topicId) {
+    const _scope = _fileScope('subscriptionDeleteExpired');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      const result = this.statement.subscriptionDeleteExpired.run({ topicId });
+      this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
     const _scope = _fileScope('subscriptionDeliveryClaim');
     this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
@@ -768,6 +783,37 @@ class DatabaseSQLite extends Database {
   }
 
 
+  topicPendingDelete(dbCtx, topicId) {
+    const _scope = _fileScope('topicPendingDelete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      this.db.transaction(() => {
+        const topic = this.statement.topicGetById.get({ topicId });
+        if (!topic.isDeleted) {
+          this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+          return;
+        }
+
+        const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
+        if (subscriberCount) {
+          this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+          return;
+        }
+
+        const result = this.statement.topicDeleteById.run({ topicId });
+        if (result.changes !== 1) {
+          throw new DBErrors.UnexpectedResult('did not delete topic');
+        }
+      })();
+      this.logger.debug(_scope, 'success', { topicId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   topicSet(dbCtx, data) {
     const _scope = _fileScope('topicSet');
     this.logger.debug(_scope, 'called', data);
diff --git a/src/db/sqlite/sql/subscription-delete-expired.sql b/src/db/sqlite/sql/subscription-delete-expired.sql
new file mode 100644 (file)
index 0000000..fae9d2c
--- /dev/null
@@ -0,0 +1,3 @@
+--
+DELETE FROM subscription
+WHERE topic_id = :topicId AND expires < strftime('%s', 'now')
diff --git a/src/db/sqlite/sql/topic-delete-by-id.sql b/src/db/sqlite/sql/topic-delete-by-id.sql
new file mode 100644 (file)
index 0000000..a66364a
--- /dev/null
@@ -0,0 +1,4 @@
+--
+DELETE FROM topic
+WHERE id = :topicId
+
index fddc8e45ae6f272a346c2131a05d909c190d03d9..cae9e74ce734b6fec771f4c9ceb3927c8e1b9028 100644 (file)
@@ -144,7 +144,7 @@ class Manager {
     if (data.topic) {
       topic = await this.db.topicGetByUrl(dbCtx, data.topic);
 
-      if (!topic && this.options.manager.publicHub) {
+      if (!topic && this._newTopicCreationAllowed()) {
         this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
 
         try {
@@ -263,43 +263,165 @@ class Manager {
 
 
   /**
-   * Check that a publish request topic is valid and exists,
-   * and if it is, add topicId to data.
-   * For a public publish request, create topic if not exists.
+   * Determine if a topic url is allowed to be created.
+   * In the future, this may be more complicated.
+   * @returns {Boolean}
+   */
+  _newTopicCreationAllowed() {
+    return this.options.manager.publicHub;
+  }
+
+
+  /**
+   * Check that a publish request's topic(s) are valid and exist,
+   * returning an array with the results for each.
+   * For a public-hub publish request, creates topics if they do not exist.
    * @param {*} dbCtx
    * @param {RootData} data
    * @param {String[]} warn
    * @param {String[]} err
    * @param {String} requestId
    */
-  async _checkPublish(dbCtx, data, warn, err, requestId) {
+  async _publishTopics(dbCtx, data, requestId) {
     const _scope = _fileScope('_checkPublish');
 
-    const publishUrl = data.url || data.topic;
+    // Publish requests may include multiple topics, consider them all, but deduplicate.
+    const publishUrls = Array.from(new Set([
+      ...common.ensureArray(data.url),
+      ...common.ensureArray(data.topic),
+    ]));
+
+    // Map the requested topics to their ids, creating if necessary.
+    return Promise.all(publishUrls.map(async (url) => {
+      const result = {
+        url,
+        warn: [],
+        err: [],
+        topicId: undefined,
+      };
+      let topic = await this.db.topicGetByUrl(dbCtx, url);
+      if (!topic && this._newTopicCreationAllowed()) {
+        try {
+          new URL(url);
+        } catch (e) {
+          result.err.push('invalid topic url (failed to parse url)');
+          return result;
+        }
+        await this.db.topicSet(dbCtx, {
+          // TODO: accept a publisherValidationUrl parameter
+          url,
+        });
+        topic = await this.db.topicGetByUrl(dbCtx, url);
+        this.logger.info(_scope, 'new topic from publish request', { url, requestId });
+      }
+      if (!topic || topic.isDeleted) {
+        result.err.push('topic not supported');
+        return result;
+      }
+      result.topicId = topic.id;
+      return result;
+    }));
+  }
 
-    let topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
-    if (!topic && this.options.manager.publicHub) {
-      this.logger.info(_scope, 'new topic from publish request', { data, requestId });
 
-      try {
-        new URL(publishUrl);
-      } catch (e) {
-        err.push('invalid topic url (failed to parse url)');
-        return;
+  /**
+   * Render response for multi-topic publish requests.
+   * @param {Object[]} publishTopics
+   */
+  static multiPublishContent(ctx, publishTopics) {
+    const responses = publishTopics.map((topic) => ({
+      href: topic.url,
+      status: topic.status,
+      statusMessage: topic.statusMessage,
+      errors: topic.err,
+      warnings: topic.warn,
+    }));
+    switch (ctx.responseType) {
+      case Enum.ContentType.ApplicationJson:
+        return JSON.stringify(responses);
+
+      case Enum.ContentType.TextPlain:
+      default: {
+        const textResponses = responses.map((response) => {
+          const details = Manager._prettyDetails(response.errors, response.warnings);
+          const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
+          return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
+        });
+        return textResponses.join('\n----\n');
       }
+    }
+  }
+
 
-      await this.db.topicSet(dbCtx, {
-        url: publishUrl,
-      });
-      topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
+  /**
+   * Process a publish request.
+   * @param {*} dbCtx
+   * @param {Object} data
+   * @param {http.ServerResponse} res
+   * @param {Object} ctx
+   */
+  async _publishRequest(dbCtx, data, res, ctx) {
+    const _scope = _fileScope('_parsePublish');
+    this.logger.debug(_scope, 'called', { data });
+
+    const requestId = ctx.requestId;
+
+    // Parse and validate all the topics in the request.
+    data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
+    if (!data.publishTopics || !data.publishTopics.length) {
+      const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
+      throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
     }
 
-    if (!topic || topic.isDeleted) {
-      err.push('not a supported topic');
-      return;
+    // Set status per topic
+    for (const topicResult of data.publishTopics) {
+      topicResult.status = topicResult.err.length ? 400 : 202;
+      topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
     }
 
-    data.topicId = topic.id;
+    // Process the valid publish notifications
+    const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
+    try {
+      await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
+    } catch (e) {
+      this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
+      throw e;
+    }
+
+    this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
+
+    if (data.publishTopics.length === 1) {
+      const soleTopic = data.publishTopics[0];
+      res.statusCode = soleTopic.status;
+      res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
+    } else {
+      res.statusCode = 207;
+      res.end(Manager.multiPublishContent(ctx, data.publishTopics));
+    }
+
+    if (this.options.manager.processImmediately
+    &&  validPublishTopics.length) {
+      try {
+        await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
+      } catch (e) {
+        this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
+        // Don't bother re-throwing, as we've already ended this response.
+      }
+    }
+  }
+
+
+  /**
+   * Annotate any encountered issues.
+   * @param {String[]} err
+   * @param {String[]} warn
+   * @returns {String[]}
+   */
+  static _prettyDetails(err, warn) {
+    return [
+      ...err.map((entry) => `error: ${entry}`),
+      ...warn.map((entry) => `warning: ${entry}`),
+    ];
   }
 
 
@@ -322,15 +444,14 @@ class Manager {
 
     await this.db.context(async (dbCtx) => {
 
+      // Handle publish requests elsewhere
       if (data.mode === Enum.Mode.Publish) {
-        await this._checkPublish(dbCtx, data, warn, err, requestId);
-      } else {
-        await this._validateRootData(dbCtx, data, warn, err, requestId);
+        return this._publishRequest(dbCtx, data, res, ctx);
       }
 
-      const prettyErr = err.map((entry) => `error: ${entry}`);
-      const prettyWarn = warn.map((entry) => `warning: ${entry}`);
-      const details = prettyErr.concat(prettyWarn);
+      await this._validateRootData(dbCtx, data, warn, err, requestId);
+
+      const details = Manager._prettyDetails(err, warn);
 
       // Any errors are fatal.  Stop and report anything that went wrong.
       if (err.length) {
@@ -339,18 +460,11 @@ class Manager {
       }
 
       // Commit the request for later processing.
-      let fn, info, id;
+      let id;
       try {
-        if (data.mode === Enum.Mode.Publish) {
-          fn = 'topicFetchRequested';
-          info = await this.db.topicFetchRequested(dbCtx, data.topicId);
-          id = data.topicId;
-        } else {
-          fn = 'verificationInsert';
-          id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
-        }
+        id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
       } catch (e) {
-        this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId });
+        this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
         throw e;
       }
 
@@ -362,15 +476,9 @@ class Manager {
       if (this.options.manager.processImmediately
       &&  id) {
         try {
-          if (data.mode === Enum.Mode.Publish) {
-            fn = 'topicFetchClaimAndProcessById';
-            await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId);
-          } else {
-            fn = 'verificationClaimAndProcessById';
-            await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
-          }
+          await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
         } catch (e) {
-          this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId });
+          this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
           // Don't bother re-throwing, as we've already ended this response.
         }
       }
@@ -518,6 +626,8 @@ class Manager {
           await this.db.topicDeleted(txCtx, topicId);
           res.end();
           this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
+          // Attempt to remove from db if no active subscriptions.
+          await this.db.topicPendingDelete(txCtx, topicId);
           return;
         }
 
index b64ecb4ee5d3a71a5c9c5d5d3f16d08c826c7aeb..a9b228c8d4df349e9047b026745c15608aa3ea30 100644 (file)
@@ -160,4 +160,21 @@ describe('Common', function () {
     });
   }); // validHash
 
+  describe('ensureArray', function () {
+    it('returns empty array for no data', function () {
+      const result = common.ensureArray();
+      assert.deepStrictEqual(result, []);
+    });
+    it('returns same array passed in', function () {
+      const expected = [1, 2, 3, 'foo'];
+      const result = common.ensureArray(expected);
+      assert.deepStrictEqual(result, expected);
+    });
+    it('returns array containing non-array data', function () {
+      const data = 'bar';
+      const result = common.ensureArray(data);
+      assert.deepStrictEqual(result, [data]);
+    });
+  }); // ensureArray
+
 }); // Common
index ca5f34e92c536458171c18fa8cd4f78976a5bf11..960a0f994b27a572236a92131638b326c59f4ece 100644 (file)
@@ -405,6 +405,23 @@ describe('Communication', function () {
       assert(communication.db.verificationComplete.called);
     });
 
+    it('unsubscription from deleted topic deletes topic', async function () {
+      communication.db.verificationGetById.restore();
+      verification.mode = 'unsubscribe';
+      sinon.stub(communication.db, 'verificationGetById').resolves(verification);
+      communication.db.topicGetById.restore();
+      sinon.stub(communication.db, 'topicGetById').resolves({
+        ...topic,
+        isDeleted: true,
+      });
+
+      await communication.verificationProcess(dbCtx, callback, topicId, requestId);
+
+      assert(communication.db.subscriptionDelete.called);
+      assert(communication.db.verificationComplete.called);
+      assert(communication.db.topicPendingDelete.called);
+    });
+
     it('unsubscription denial succeeds', async function () {
       communication.db.verificationGetById.restore();
       verification.mode = 'unsubscribe';
index e6f632c897280586c404ebe2962fc747e08e7923..8d5b61553d24f3b72e4ec6d39fd73534de7617cd 100644 (file)
@@ -114,6 +114,7 @@ describe('Database Integration', function () {
       }); // Authentication
 
       describe('Topic', function () {
+        let anotherTopicId;
         step('requires data', async function () {
           try {
             await db.context(async (dbCtx) => {
@@ -222,7 +223,7 @@ describe('Database Integration', function () {
         step('deletes a topic', async function () {
           await db.context(async (dbCtx) => {
             const result = await db.topicSet(dbCtx, testData.anotherTopicSet);
-            const anotherTopicId = result.lastInsertRowid;
+            anotherTopicId = result.lastInsertRowid;
             await db.topicDeleted(dbCtx, anotherTopicId);
             const topic = await db.topicGetById(dbCtx, anotherTopicId);
             assert.strictEqual(topic.isDeleted, true);
@@ -231,7 +232,7 @@ describe('Database Integration', function () {
         step('update un-deletes a topic', async function () {
           await db.context(async (dbCtx) => {
             const result = await db.topicSet(dbCtx, testData.anotherTopicSet);
-            const anotherTopicId = result.lastInsertRowid;
+            assert.strictEqual(result.lastInsertRowid, anotherTopicId);
             const topic = await db.topicGetById(dbCtx, anotherTopicId);
             assert.strictEqual(topic.isDeleted, false);
           });
@@ -242,6 +243,15 @@ describe('Database Integration', function () {
             assert(topics.length);
           });
         });
+        // pending delete of deleted topic with no subscriptions
+        step('really deletes unsubscribed deleted topic', async function() {
+          await db.context(async (dbCtx) => {
+            await db.topicDeleted(dbCtx, anotherTopicId);
+            await db.topicPendingDelete(dbCtx, anotherTopicId);
+            const topic = await db.topicGetById(dbCtx, anotherTopicId);
+            assert(!topic);
+          });
+        });
       }); // Topic
 
       describe('Subscription', function () {
@@ -372,6 +382,28 @@ describe('Database Integration', function () {
             assert(!subscription);
           });
         });
+        step('create expired subscription', async function () {
+          const data = {
+            ...testData.subscriptionUpsert,
+            secret: 'newSecret',
+            topicId,
+            leaseSeconds: -1,
+          };
+          await db.context(async (dbCtx) => {
+            const result = await db.subscriptionUpsert(dbCtx, data);
+            assert(result.lastInsertRowid);
+            assert.notStrictEqual(result.lastInsertRowid, subscriptionId);
+            subscriptionId = result.lastInsertRowid;
+            assert.strictEqual(result.changes, 1);
+          });
+        });
+        step('delete expired subscriptions', async function() {
+          await db.context(async (dbCtx) => {
+            await db.subscriptionDeleteExpired(dbCtx, topicId)
+            const subscription = await db.subscriptionGet(dbCtx, testData.subscriptionUpsert.callback, topicId);
+            assert(!subscription);
+          });
+        });
       }); // Subscription
 
       describe('Verification', function () {
index ef4790542cd147a076a0fdc11985da6641f10b64..0f037d6fae91b6dd4eed3098626317c9797f9d79 100644 (file)
@@ -557,7 +557,7 @@ describe('DatabasePostgres', function () {
         changes: 1,
         lastInsertRowid: undefined,
         duration: 10,
-      }
+      };
       sinon.stub(db.db, 'result').resolves(dbResult);
       const result = await db.subscriptionDelete(dbCtx, callback, topicId);
       assert.deepStrictEqual(result, expected);
@@ -574,6 +574,34 @@ describe('DatabasePostgres', function () {
     });
   }); // subscriptionDelete
 
+  describe('subscriptionDeleteExpired', function () {
+    it('success', async function () {
+      const dbResult = {
+        rowCount: 1,
+        rows: [],
+        duration: 10,
+      };
+      const expected = {
+        changes: 1,
+        lastInsertRowid: undefined,
+        duration: 10,
+      };
+      sinon.stub(db.db, 'result').resolves(dbResult);
+      const result = await db.subscriptionDeleteExpired(dbCtx, topicId);
+      assert.deepStrictEqual(result, expected);
+    });
+    it('failure', async function() {
+      const expected = new Error();
+      sinon.stub(db.db, 'result').rejects(expected);
+      try {
+        await db.subscriptionDeleteExpired(dbCtx, topicId);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
+  });
+
   describe('subscriptionDeliveryClaim', function () {
     it('success', async function() {
       const dbResult = [
@@ -1214,6 +1242,69 @@ describe('DatabasePostgres', function () {
     });
   }); // topicGetContentById
 
+  describe('topicPendingDelete', function () {
+    beforeEach(function () {
+      sinon.stub(db.db, 'one');
+      sinon.stub(db.db, 'result');
+    });
+    it('success', async function () {
+      db.db.one.onCall(0).resolves({
+        id: topicId,
+        isDeleted: true,
+      }).onCall(1).resolves({
+        count: 0,
+      });
+      const dbResult = {
+        rowCount: 1,
+        rows: [],
+        duration: 10,
+      };
+      db.db.result.resolves(dbResult);
+      await db.topicPendingDelete(dbCtx, topicId);
+      assert(db.db.result.called);
+    });
+    it('does not delete non-deleted topic', async function () {
+      db.db.one.onCall(0).resolves({
+        id: topicId,
+        isDeleted: false,
+      }).onCall(1).resolves({
+        count: 0,
+      });
+      await db.topicPendingDelete(dbCtx, topicId);
+      assert(!db.db.result.called);
+    });
+    it('does not delete topic with active subscriptions', async function () {
+      db.db.one.onCall(0).resolves({
+        id: topicId,
+        isDeleted: true,
+      }).onCall(1).resolves({
+        count: 10,
+      });
+      await db.topicPendingDelete(dbCtx, topicId);
+      assert(!db.db.result.called);
+    });
+    it('covers no deletion', async function () {
+      db.db.one.onCall(0).resolves({
+        id: topicId,
+        isDeleted: true,
+      }).onCall(1).resolves({
+        count: 0,
+      });
+      const dbResult = {
+        rowCount: 0,
+        rows: [],
+        duration: 10,
+      };
+      db.db.result.resolves(dbResult);
+      try {
+        await db.topicPendingDelete(dbCtx, topicId);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert(e instanceof DBErrors.UnexpectedResult);
+      }
+    });
+  });
+
   describe('topicSet', function () {
     let data;
     beforeEach(function () {
index 0c96df364d1d4d5ecfb98b6de6e2a021a3c92a05..370d559909cfdfaa1bb2eb7020584e3f8e0eb7d1 100644 (file)
@@ -404,8 +404,34 @@ describe('DatabaseSQLite', function () {
     });
   }); // subscriptionDelete
 
+  describe('subscriptionDeleteExpired', function () {
+    it('success', async function () {
+      const dbResult = {
+        changes: 1,
+        lastInsertRowid: undefined,
+      };
+      const expected = {
+        changes: 1,
+        lastInsertRowid: undefined,
+      };
+      sinon.stub(db.statement.subscriptionDeleteExpired, 'run').returns(dbResult);
+      const result = await db.subscriptionDeleteExpired(dbCtx, topicId);
+      assert.deepStrictEqual(result, expected);
+    });
+    it('failure', async function () {
+      const expected = new Error();
+      sinon.stub(db.statement.subscriptionDeleteExpired, 'run').throws(expected);
+      try {
+        await db.subscriptionDeleteExpired(dbCtx, topicId);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
+  });
+
   describe('subscriptionDeliveryClaim', function () {
-    it('success', async function() {
+    it('success', async function () {
       const dbAllResult = [
         {
           id: 'c2e254c5-aa6e-4a8f-b1a1-e474b07392bb',
@@ -1021,6 +1047,76 @@ describe('DatabaseSQLite', function () {
     });
   }); // topicGetContentById
 
+  describe('topicPendingDelete', function () {
+    beforeEach(function () {
+      sinon.stub(db.statement.topicGetById, 'get');
+      sinon.stub(db.statement.subscriptionCountByTopicUrl, 'get');
+      sinon.stub(db.statement.topicDeleteById, 'run');
+    });
+    it('success', async function () {
+      db.statement.topicGetById.get.returns({
+        id: topicId,
+        isDeleted: true,
+      });
+      db.statement.subscriptionCountByTopicUrl.get.returns({
+        count: 0,
+      });
+      db.statement.topicDeleteById.run.returns({
+        changes: 1,
+      });
+      db.topicPendingDelete(dbCtx, topicId);
+      assert(db.statement.topicDeleteById.run.called);
+    });
+    it('does not delete non-deleted topic', async function () {
+      db.statement.topicGetById.get.returns({
+        id: topicId,
+        isDeleted: false,
+      });
+      db.statement.subscriptionCountByTopicUrl.get.returns({
+        count: 0,
+      });
+      db.statement.topicDeleteById.run.returns({
+        changes: 1,
+      });
+      db.topicPendingDelete(dbCtx, topicId);
+      assert(!db.statement.topicDeleteById.run.called);
+    });
+    it('does not delete topic with active subscriptions', async function () {
+      db.statement.topicGetById.get.returns({
+        id: topicId,
+        isDeleted: true,
+      });
+      db.statement.subscriptionCountByTopicUrl.get.returns({
+        count: 10,
+      });
+      db.statement.topicDeleteById.run.returns({
+        changes: 1,
+      });
+      db.topicPendingDelete(dbCtx, topicId);
+      assert(!db.statement.topicDeleteById.run.called);
+    });
+    it('covers no deletion', async function () {
+      db.statement.topicGetById.get.returns({
+        id: topicId,
+        isDeleted: true,
+      });
+      db.statement.subscriptionCountByTopicUrl.get.returns({
+        count: 0,
+      });
+      db.statement.topicDeleteById.run.returns({
+        changes: 0,
+      });
+      try {
+        db.topicPendingDelete(dbCtx, topicId);
+        assert.fail(noExpectedException);
+
+      } catch (e) {
+        assert(e instanceof DBErrors.UnexpectedResult);
+      }
+      assert(db.statement.topicDeleteById.run.called);
+    });
+  });
+
   describe('topicSet', function () {
     let data;
     beforeEach(function () {
index 7870a55b45b62df6d4a8c3192501467c20b1e5c1..89e3d65fca2b8370f12de13c02e46553cdaea404 100644 (file)
@@ -37,6 +37,7 @@ describe('Manager', function () {
     manager = new Manager(stubLogger, stubDb, options);
     sinon.stub(manager.communication, 'verificationProcess');
     sinon.stub(manager.communication, 'topicFetchProcess');
+    sinon.stub(manager.communication, 'topicFetchClaimAndProcessById');
     stubDb._reset();
     stubLogger._reset();
   });
@@ -174,7 +175,7 @@ describe('Manager', function () {
       await manager.getAdminOverview(res, ctx);
       assert(res.end.called);
     });
-  });
+  }); // getAdminOverview
 
   describe('getTopicDetails', function () {
     it('covers', async function() {
@@ -559,13 +560,11 @@ describe('Manager', function () {
     });
   }); // _checkMode
 
-  describe('_checkPublish', function () {
-    let dbCtx, data, warn, err, requestId;
+  describe('_publishTopics', function () {
+    let dbCtx, data, requestId;
     beforeEach(function () {
       dbCtx = {};
       data = {};
-      warn = [];
-      err = [];
       requestId = 'blah';
     });
     it('succeeds', async function () {
@@ -573,26 +572,29 @@ describe('Manager', function () {
         id: 222,
       });
       Object.assign(data, testData.validPublishRootData);
-      await manager._checkPublish(dbCtx, data, warn, err, requestId);
-      assert.strictEqual(warn.length, 0, 'unexpected warnings length');
-      assert.strictEqual(err.length, 0, 'unexpected errors length');
-      assert.strictEqual(data.topicId, 222, 'unexpected topic id');
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 1);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
     });
     it('fails bad url', async function () {
       Object.assign(data, testData.validPublishRootData, { topic: 'not_a_url' });
-      await manager._checkPublish(dbCtx, data, warn, err, requestId);
-      assert.strictEqual(err.length, 1, 'unexpected errors length');
-      assert.strictEqual(warn.length, 0);
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 1);
+      assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].warn.length, 0);
     });
     it('accepts new public publish topic', async function () {
       manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
         id: 222,
       });
       Object.assign(data, testData.validPublishRootData);
-      await manager._checkPublish(dbCtx, data, warn, err, requestId);
-      assert.strictEqual(warn.length, 0, 'unexpected warnings length');
-      assert.strictEqual(err.length, 0, 'unexpected errors length');
-      assert.strictEqual(data.topicId, 222, 'unexpected topic id');
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 1);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
     });
     it('does not publish deleted topic', async function () {
       manager.db.topicGetByUrl.resolves({
@@ -600,12 +602,176 @@ describe('Manager', function () {
         isDeleted: true,
       });
       Object.assign(data, testData.validPublishRootData);
-      await manager._checkPublish(dbCtx, data, warn, err, requestId);
-      assert.strictEqual(warn.length, 0, 'unexpected warnings length');
-      assert.strictEqual(err.length, 1, 'unexpected errors length');
-      assert.strictEqual(data.topicId, undefined, 'unexpected topic id');
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 1);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, undefined, 'unexpected topic id');
+    });
+    it('no topics', async function() {
+      Object.assign(data, testData.validPublishRootData);
+      delete data.topic;
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 0);
+    });
+    it('multiple valid topics', async function () {
+      manager.db.topicGetByUrl.resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      data.url = ['https://example.com/first', 'https://example.com/second'];
+      data.topic = ['https://example.com/third'];
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 3);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
+      assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[1].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[1].topicId, 222, 'unexpected topic id');
+      assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id');
+    });
+    it('mix of valid and invalid topics', async function () {
+      manager.db.topicGetByUrl.onCall(1).resolves().resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      data.url = ['https://example.com/first', 'not a url'];
+      data.topic = ['https://example.com/third'];
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 3);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
+      assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[1].err.length, 1, 'unexpected errors length');
+      assert.strictEqual(topicResults[1].topicId, undefined, 'unexpected topic id');
+      assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id');
+    });
+  }); // _publishTopics
+
+  describe('_publishRequest', function () {
+    let dbCtx, data, res, ctx;
+    beforeEach(function () {
+      dbCtx = {};
+      data = {};
+      res = {
+        end: sinon.stub(),
+      };
+      ctx = {};
+    });
+    it('requires a topic', async function () {
+      try {
+        await manager._publishRequest(dbCtx, data, res, ctx);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert(e instanceof Errors.ResponseError);
+      }
+    });
+    it('processes one topic', async function() {
+      manager.db.topicGetByUrl.resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      manager.db.topicFetchRequested.resolves();
+      await manager._publishRequest(dbCtx, data, res, ctx);
+      assert(manager.db.topicFetchRequested.called);
+      assert.strictEqual(res.statusCode, 202);
+      assert(res.end.called);
+    });
+    it('processes mix of valid and invalid topics', async function () {
+      ctx.responseType = 'application/json';
+      manager.db.topicGetByUrl.onCall(1).resolves().resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      data.url = ['https://example.com/first', 'not a url'];
+      data.topic = ['https://example.com/third'];
+      await manager._publishRequest(dbCtx, data, res, ctx);
+      assert.strictEqual(res.statusCode, 207);
+      assert(res.end.called);
+    });
+    it('covers topicFetchRequest failure', async function () {
+      manager.db.topicGetByUrl.resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      const expected = new Error('boo');
+      manager.db.topicFetchRequested.rejects(expected);
+      try {
+        await manager._publishRequest(dbCtx, data, res, ctx);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
     });
-  }); // _checkPublish
+    it('covers immediate processing error', async function() {
+      manager.options.manager.processImmediately = true;
+      manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
+        id: 222,
+      });
+      manager.communication.topicFetchClaimAndProcessById.rejects();
+      Object.assign(data, testData.validPublishRootData);
+      await manager._publishRequest(dbCtx, data, res, ctx);
+      assert(manager.db.topicFetchRequested.called);
+      assert.strictEqual(res.statusCode, 202);
+      assert(res.end.called);
+      assert(manager.communication.topicFetchClaimAndProcessById.called)
+    });
+    it('covers no immediate processing', async function() {
+      manager.options.manager.processImmediately = false;
+      manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      await manager._publishRequest(dbCtx, data, res, ctx);
+      assert(manager.db.topicFetchRequested.called);
+      assert.strictEqual(res.statusCode, 202);
+      assert(res.end.called);
+      assert(!manager.communication.topicFetchClaimAndProcessById.called)
+    });
+  }); // _publishRequest
+
+  describe('multiPublishContent', function () {
+    let publishTopics;
+    beforeEach(function () {
+      publishTopics = [{
+        url: 'https://example.com/first',
+        warn: [],
+        err: [],
+        topicId: 222,
+        status: 202,
+        statusMessage: 'Accepted',
+      },
+      {
+        url: 'not a url',
+        warn: [],
+        err: [ 'invalid topic url (failed to parse url)' ],
+        topicId: undefined,
+        status: 400,
+        statusMessage: 'Bad Request',
+      }];
+    });
+    it('covers json response', function () {
+      ctx.responseType = 'application/json';
+      const expected = '[{"href":"https://example.com/first","status":202,"statusMessage":"Accepted","errors":[],"warnings":[]},{"href":"not a url","status":400,"statusMessage":"Bad Request","errors":["invalid topic url (failed to parse url)"],"warnings":[]}]';
+      const result = Manager.multiPublishContent(ctx, publishTopics);
+      assert.deepStrictEqual(result, expected);
+    });
+    it('covers text response', function () {
+      ctx.responseType = 'text/plain';
+      const expected = `https://example.com/first [202 Accepted]
+----
+not a url [400 Bad Request]
+\terror: invalid topic url (failed to parse url)`;
+      const result = Manager.multiPublishContent(ctx, publishTopics);
+      assert.deepStrictEqual(result, expected);
+    });
+  }); // multiPublishContent
 
   describe('processTasks', function () {
     it('covers', async function () {
index 5ef24228a8b5bf1db00257cf1745572d5c3ef742..f257cbfeeb0728af0b9b1bd135e49e7bb505d4c2 100644 (file)
@@ -17,6 +17,7 @@ const stubFns = [
   'subscriptionsByTopicId',
   'subscriptionCountByTopicUrl',
   'subscriptionDelete',
+  'subscriptionDeleteExpired',
   'subscriptionDeliveryClaim',
   'subscriptionDeliveryClaimById',
   'subscriptionDeliveryComplete',
@@ -36,6 +37,7 @@ const stubFns = [
   'topicGetById',
   'topicGetByUrl',
   'topicGetContentById',
+  'topicPendingDelete',
   'topicSet',
   'topicSetContent',
   'topicUpdate',