publish requests may now include multiple topic urls
authorJustin Wind <justin.wind+git@gmail.com>
Fri, 20 Aug 2021 22:28:36 +0000 (15:28 -0700)
committerJustin Wind <justin.wind+git@gmail.com>
Mon, 23 Aug 2021 22:33:46 +0000 (15:33 -0700)
All publish notification urls passed in are now considered for topic
updates.

CHANGELOG.md
src/common.js
src/manager.js
test/src/common.js
test/src/manager.js

index 966dede5324bae5c60a68b606f7e693c20367e9f..a8521deb25012489398fc126cfca748525f6254d 100644 (file)
@@ -4,6 +4,10 @@ Releases and notable changes to this project are documented here.
 
 ## [Unreleased]
 
+### Added
+
+- Accept multiple topics in publish requests.
+
 ## [v1.1.5] - 2021-08-23
 
 ### Fixed
index 0cacc3bf086f2b57f0b10db1ef289c6be7961e15..55a0807b00757e2c2fa0fe8d50e7c22701b0bacf 100644 (file)
@@ -26,6 +26,22 @@ const validHash = (algorithm) => getHashes()
   .filter((h) => h.match(/^sha[0-9]+$/))
   .includes(algorithm);
 
+
+/**
+ * Return an array containing x if x is not an array.
+ * @param {*} x
+ */
+const ensureArray = (x) => {
+  if (x === undefined) {
+    return [];
+  }
+  if (!Array.isArray(x)) {
+    return Array(x);
+  }
+  return x;
+};
+
+
 /**
  * Recursively freeze an object.
  * @param {Object} o 
@@ -140,6 +156,7 @@ module.exports = {
   arrayChunk,
   attemptRetrySeconds,
   axiosResponseLogData,
+  ensureArray,
   freezeDeep,
   logTruncate,
   randomBytesAsync,
index fddc8e45ae6f272a346c2131a05d909c190d03d9..0f110682eb2b3f94ddd1fe0076dab22e620ce69a 100644 (file)
@@ -144,7 +144,7 @@ class Manager {
     if (data.topic) {
       topic = await this.db.topicGetByUrl(dbCtx, data.topic);
 
-      if (!topic && this.options.manager.publicHub) {
+      if (!topic && this._newTopicCreationAllowed()) {
         this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
 
         try {
@@ -263,43 +263,165 @@ class Manager {
 
 
   /**
-   * Check that a publish request topic is valid and exists,
-   * and if it is, add topicId to data.
-   * For a public publish request, create topic if not exists.
+   * Determine if a topic url is allowed to be created.
+   * In the future, this may be more complicated.
+   * @returns {Boolean}
+   */
+  _newTopicCreationAllowed() {
+    return this.options.manager.publicHub;
+  }
+
+
+  /**
+   * Check that a publish request's topic(s) are valid and exist,
+   * returning an array with the results for each.
+   * For a public-hub publish request, creates topics if they do not exist.
    * @param {*} dbCtx
    * @param {RootData} data
    * @param {String[]} warn
    * @param {String[]} err
    * @param {String} requestId
    */
-  async _checkPublish(dbCtx, data, warn, err, requestId) {
+  async _publishTopics(dbCtx, data, requestId) {
     const _scope = _fileScope('_checkPublish');
 
-    const publishUrl = data.url || data.topic;
+    // Publish requests may include multiple topics, consider them all, but deduplicate.
+    const publishUrls = Array.from(new Set([
+      ...common.ensureArray(data.url),
+      ...common.ensureArray(data.topic),
+    ]));
+
+    // Map the requested topics to their ids, creating if necessary.
+    return Promise.all(publishUrls.map(async (url) => {
+      const result = {
+        url,
+        warn: [],
+        err: [],
+        topicId: undefined,
+      };
+      let topic = await this.db.topicGetByUrl(dbCtx, url);
+      if (!topic && this._newTopicCreationAllowed()) {
+        try {
+          new URL(url);
+        } catch (e) {
+          result.err.push('invalid topic url (failed to parse url)');
+          return result;
+        }
+        await this.db.topicSet(dbCtx, {
+          // TODO: accept a publisherValidationUrl parameter
+          url,
+        });
+        topic = await this.db.topicGetByUrl(dbCtx, url);
+        this.logger.info(_scope, 'new topic from publish request', { url, requestId });
+      }
+      if (!topic || topic.isDeleted) {
+        result.err.push('topic not supported');
+        return result;
+      }
+      result.topicId = topic.id;
+      return result;
+    }));
+  }
 
-    let topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
-    if (!topic && this.options.manager.publicHub) {
-      this.logger.info(_scope, 'new topic from publish request', { data, requestId });
 
-      try {
-        new URL(publishUrl);
-      } catch (e) {
-        err.push('invalid topic url (failed to parse url)');
-        return;
+  /**
+   * Render response for multi-topic publish requests.
+   * @param {Object[]} publishTopics
+   */
+  static multiPublishContent(ctx, publishTopics) {
+    const responses = publishTopics.map((topic) => ({
+      href: topic.url,
+      status: topic.status,
+      statusMessage: topic.statusMessage,
+      errors: topic.err,
+      warnings: topic.warn,
+    }));
+    switch (ctx.responseType) {
+      case Enum.ContentType.ApplicationJson:
+        return JSON.stringify(responses);
+
+      case Enum.ContentType.TextPlain:
+      default: {
+        const textResponses = responses.map((response) => {
+          const details = Manager._prettyDetails(response.errors, response.warnings);
+          const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
+          return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
+        });
+        return textResponses.join('\n----\n');
       }
+    }
+  }
+
 
-      await this.db.topicSet(dbCtx, {
-        url: publishUrl,
-      });
-      topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
+  /**
+   * Process a publish request.
+   * @param {*} dbCtx
+   * @param {Object} data
+   * @param {http.ServerResponse} res
+   * @param {Object} ctx
+   */
+  async _publishRequest(dbCtx, data, res, ctx) {
+    const _scope = _fileScope('_parsePublish');
+    this.logger.debug(_scope, 'called', { data });
+
+    const requestId = ctx.requestId;
+
+    // Parse and validate all the topics in the request.
+    data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
+    if (!data.publishTopics || !data.publishTopics.length) {
+      const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
+      throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
     }
 
-    if (!topic || topic.isDeleted) {
-      err.push('not a supported topic');
-      return;
+    // Set status per topic
+    for (const topicResult of data.publishTopics) {
+      topicResult.status = topicResult.err.length ? 400 : 202;
+      topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
     }
 
-    data.topicId = topic.id;
+    // Process the valid publish notifications
+    const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
+    try {
+      await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
+    } catch (e) {
+      this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
+      throw e;
+    }
+
+    this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
+
+    if (data.publishTopics.length === 1) {
+      const soleTopic = data.publishTopics[0];
+      res.statusCode = soleTopic.status;
+      res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
+    } else {
+      res.statusCode = 207;
+      res.end(Manager.multiPublishContent(ctx, data.publishTopics));
+    }
+
+    if (this.options.manager.processImmediately
+    &&  validPublishTopics.length) {
+      try {
+        await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
+      } catch (e) {
+        this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
+        // Don't bother re-throwing, as we've already ended this response.
+      }
+    }
+  }
+
+
+  /**
+   * Annotate any encountered issues.
+   * @param {String[]} err
+   * @param {String[]} warn
+   * @returns {String[]}
+   */
+  static _prettyDetails(err, warn) {
+    return [
+      ...err.map((entry) => `error: ${entry}`),
+      ...warn.map((entry) => `warning: ${entry}`),
+    ];
   }
 
 
@@ -322,15 +444,14 @@ class Manager {
 
     await this.db.context(async (dbCtx) => {
 
+      // Handle publish requests elsewhere
       if (data.mode === Enum.Mode.Publish) {
-        await this._checkPublish(dbCtx, data, warn, err, requestId);
-      } else {
-        await this._validateRootData(dbCtx, data, warn, err, requestId);
+        return this._publishRequest(dbCtx, data, res, ctx);
       }
 
-      const prettyErr = err.map((entry) => `error: ${entry}`);
-      const prettyWarn = warn.map((entry) => `warning: ${entry}`);
-      const details = prettyErr.concat(prettyWarn);
+      await this._validateRootData(dbCtx, data, warn, err, requestId);
+
+      const details = Manager._prettyDetails(err, warn);
 
       // Any errors are fatal.  Stop and report anything that went wrong.
       if (err.length) {
@@ -339,18 +460,11 @@ class Manager {
       }
 
       // Commit the request for later processing.
-      let fn, info, id;
+      let id;
       try {
-        if (data.mode === Enum.Mode.Publish) {
-          fn = 'topicFetchRequested';
-          info = await this.db.topicFetchRequested(dbCtx, data.topicId);
-          id = data.topicId;
-        } else {
-          fn = 'verificationInsert';
-          id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
-        }
+        id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
       } catch (e) {
-        this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId });
+        this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
         throw e;
       }
 
@@ -362,15 +476,9 @@ class Manager {
       if (this.options.manager.processImmediately
       &&  id) {
         try {
-          if (data.mode === Enum.Mode.Publish) {
-            fn = 'topicFetchClaimAndProcessById';
-            await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId);
-          } else {
-            fn = 'verificationClaimAndProcessById';
-            await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
-          }
+          await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
         } catch (e) {
-          this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId });
+          this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
           // Don't bother re-throwing, as we've already ended this response.
         }
       }
index b64ecb4ee5d3a71a5c9c5d5d3f16d08c826c7aeb..a9b228c8d4df349e9047b026745c15608aa3ea30 100644 (file)
@@ -160,4 +160,21 @@ describe('Common', function () {
     });
   }); // validHash
 
+  describe('ensureArray', function () {
+    it('returns empty array for no data', function () {
+      const result = common.ensureArray();
+      assert.deepStrictEqual(result, []);
+    });
+    it('returns same array passed in', function () {
+      const expected = [1, 2, 3, 'foo'];
+      const result = common.ensureArray(expected);
+      assert.deepStrictEqual(result, expected);
+    });
+    it('returns array containing non-array data', function () {
+      const data = 'bar';
+      const result = common.ensureArray(data);
+      assert.deepStrictEqual(result, [data]);
+    });
+  }); // ensureArray
+
 }); // Common
index 7870a55b45b62df6d4a8c3192501467c20b1e5c1..89e3d65fca2b8370f12de13c02e46553cdaea404 100644 (file)
@@ -37,6 +37,7 @@ describe('Manager', function () {
     manager = new Manager(stubLogger, stubDb, options);
     sinon.stub(manager.communication, 'verificationProcess');
     sinon.stub(manager.communication, 'topicFetchProcess');
+    sinon.stub(manager.communication, 'topicFetchClaimAndProcessById');
     stubDb._reset();
     stubLogger._reset();
   });
@@ -174,7 +175,7 @@ describe('Manager', function () {
       await manager.getAdminOverview(res, ctx);
       assert(res.end.called);
     });
-  });
+  }); // getAdminOverview
 
   describe('getTopicDetails', function () {
     it('covers', async function() {
@@ -559,13 +560,11 @@ describe('Manager', function () {
     });
   }); // _checkMode
 
-  describe('_checkPublish', function () {
-    let dbCtx, data, warn, err, requestId;
+  describe('_publishTopics', function () {
+    let dbCtx, data, requestId;
     beforeEach(function () {
       dbCtx = {};
       data = {};
-      warn = [];
-      err = [];
       requestId = 'blah';
     });
     it('succeeds', async function () {
@@ -573,26 +572,29 @@ describe('Manager', function () {
         id: 222,
       });
       Object.assign(data, testData.validPublishRootData);
-      await manager._checkPublish(dbCtx, data, warn, err, requestId);
-      assert.strictEqual(warn.length, 0, 'unexpected warnings length');
-      assert.strictEqual(err.length, 0, 'unexpected errors length');
-      assert.strictEqual(data.topicId, 222, 'unexpected topic id');
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 1);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
     });
     it('fails bad url', async function () {
       Object.assign(data, testData.validPublishRootData, { topic: 'not_a_url' });
-      await manager._checkPublish(dbCtx, data, warn, err, requestId);
-      assert.strictEqual(err.length, 1, 'unexpected errors length');
-      assert.strictEqual(warn.length, 0);
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 1);
+      assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].warn.length, 0);
     });
     it('accepts new public publish topic', async function () {
       manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
         id: 222,
       });
       Object.assign(data, testData.validPublishRootData);
-      await manager._checkPublish(dbCtx, data, warn, err, requestId);
-      assert.strictEqual(warn.length, 0, 'unexpected warnings length');
-      assert.strictEqual(err.length, 0, 'unexpected errors length');
-      assert.strictEqual(data.topicId, 222, 'unexpected topic id');
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 1);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
     });
     it('does not publish deleted topic', async function () {
       manager.db.topicGetByUrl.resolves({
@@ -600,12 +602,176 @@ describe('Manager', function () {
         isDeleted: true,
       });
       Object.assign(data, testData.validPublishRootData);
-      await manager._checkPublish(dbCtx, data, warn, err, requestId);
-      assert.strictEqual(warn.length, 0, 'unexpected warnings length');
-      assert.strictEqual(err.length, 1, 'unexpected errors length');
-      assert.strictEqual(data.topicId, undefined, 'unexpected topic id');
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 1);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, undefined, 'unexpected topic id');
+    });
+    it('no topics', async function() {
+      Object.assign(data, testData.validPublishRootData);
+      delete data.topic;
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 0);
+    });
+    it('multiple valid topics', async function () {
+      manager.db.topicGetByUrl.resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      data.url = ['https://example.com/first', 'https://example.com/second'];
+      data.topic = ['https://example.com/third'];
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 3);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
+      assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[1].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[1].topicId, 222, 'unexpected topic id');
+      assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id');
+    });
+    it('mix of valid and invalid topics', async function () {
+      manager.db.topicGetByUrl.onCall(1).resolves().resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      data.url = ['https://example.com/first', 'not a url'];
+      data.topic = ['https://example.com/third'];
+      const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+      assert.strictEqual(topicResults.length, 3);
+      assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
+      assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[1].err.length, 1, 'unexpected errors length');
+      assert.strictEqual(topicResults[1].topicId, undefined, 'unexpected topic id');
+      assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length');
+      assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length');
+      assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id');
+    });
+  }); // _publishTopics
+
+  describe('_publishRequest', function () {
+    let dbCtx, data, res, ctx;
+    beforeEach(function () {
+      dbCtx = {};
+      data = {};
+      res = {
+        end: sinon.stub(),
+      };
+      ctx = {};
+    });
+    it('requires a topic', async function () {
+      try {
+        await manager._publishRequest(dbCtx, data, res, ctx);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert(e instanceof Errors.ResponseError);
+      }
+    });
+    it('processes one topic', async function() {
+      manager.db.topicGetByUrl.resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      manager.db.topicFetchRequested.resolves();
+      await manager._publishRequest(dbCtx, data, res, ctx);
+      assert(manager.db.topicFetchRequested.called);
+      assert.strictEqual(res.statusCode, 202);
+      assert(res.end.called);
+    });
+    it('processes mix of valid and invalid topics', async function () {
+      ctx.responseType = 'application/json';
+      manager.db.topicGetByUrl.onCall(1).resolves().resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      data.url = ['https://example.com/first', 'not a url'];
+      data.topic = ['https://example.com/third'];
+      await manager._publishRequest(dbCtx, data, res, ctx);
+      assert.strictEqual(res.statusCode, 207);
+      assert(res.end.called);
+    });
+    it('covers topicFetchRequest failure', async function () {
+      manager.db.topicGetByUrl.resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      const expected = new Error('boo');
+      manager.db.topicFetchRequested.rejects(expected);
+      try {
+        await manager._publishRequest(dbCtx, data, res, ctx);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
     });
-  }); // _checkPublish
+    it('covers immediate processing error', async function() {
+      manager.options.manager.processImmediately = true;
+      manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
+        id: 222,
+      });
+      manager.communication.topicFetchClaimAndProcessById.rejects();
+      Object.assign(data, testData.validPublishRootData);
+      await manager._publishRequest(dbCtx, data, res, ctx);
+      assert(manager.db.topicFetchRequested.called);
+      assert.strictEqual(res.statusCode, 202);
+      assert(res.end.called);
+      assert(manager.communication.topicFetchClaimAndProcessById.called)
+    });
+    it('covers no immediate processing', async function() {
+      manager.options.manager.processImmediately = false;
+      manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
+        id: 222,
+      });
+      Object.assign(data, testData.validPublishRootData);
+      await manager._publishRequest(dbCtx, data, res, ctx);
+      assert(manager.db.topicFetchRequested.called);
+      assert.strictEqual(res.statusCode, 202);
+      assert(res.end.called);
+      assert(!manager.communication.topicFetchClaimAndProcessById.called)
+    });
+  }); // _publishRequest
+
+  describe('multiPublishContent', function () {
+    let publishTopics;
+    beforeEach(function () {
+      publishTopics = [{
+        url: 'https://example.com/first',
+        warn: [],
+        err: [],
+        topicId: 222,
+        status: 202,
+        statusMessage: 'Accepted',
+      },
+      {
+        url: 'not a url',
+        warn: [],
+        err: [ 'invalid topic url (failed to parse url)' ],
+        topicId: undefined,
+        status: 400,
+        statusMessage: 'Bad Request',
+      }];
+    });
+    it('covers json response', function () {
+      ctx.responseType = 'application/json';
+      const expected = '[{"href":"https://example.com/first","status":202,"statusMessage":"Accepted","errors":[],"warnings":[]},{"href":"not a url","status":400,"statusMessage":"Bad Request","errors":["invalid topic url (failed to parse url)"],"warnings":[]}]';
+      const result = Manager.multiPublishContent(ctx, publishTopics);
+      assert.deepStrictEqual(result, expected);
+    });
+    it('covers text response', function () {
+      ctx.responseType = 'text/plain';
+      const expected = `https://example.com/first [202 Accepted]
+----
+not a url [400 Bad Request]
+\terror: invalid topic url (failed to parse url)`;
+      const result = Manager.multiPublishContent(ctx, publishTopics);
+      assert.deepStrictEqual(result, expected);
+    });
+  }); // multiPublishContent
 
   describe('processTasks', function () {
     it('covers', async function () {