From cab7ebc31583981d0c235039afdfc9d63e730f02 Mon Sep 17 00:00:00 2001 From: Justin Wind Date: Fri, 20 Aug 2021 15:28:36 -0700 Subject: [PATCH] publish requests may now include multiple topic urls All publish notification urls passed in are now considered for topic updates. --- CHANGELOG.md | 4 + src/common.js | 17 ++++ src/manager.js | 200 ++++++++++++++++++++++++++++++++---------- test/src/common.js | 17 ++++ test/src/manager.js | 208 +++++++++++++++++++++++++++++++++++++++----- 5 files changed, 379 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 966dede..a8521de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ Releases and notable changes to this project are documented here. ## [Unreleased] +### Added + +- Accept multiple topics in publish requests. + ## [v1.1.5] - 2021-08-23 ### Fixed diff --git a/src/common.js b/src/common.js index 0cacc3b..55a0807 100644 --- a/src/common.js +++ b/src/common.js @@ -26,6 +26,22 @@ const validHash = (algorithm) => getHashes() .filter((h) => h.match(/^sha[0-9]+$/)) .includes(algorithm); + +/** + * Return an array containing x if x is not an array. + * @param {*} x + */ +const ensureArray = (x) => { + if (x === undefined) { + return []; + } + if (!Array.isArray(x)) { + return Array(x); + } + return x; +}; + + /** * Recursively freeze an object. * @param {Object} o @@ -140,6 +156,7 @@ module.exports = { arrayChunk, attemptRetrySeconds, axiosResponseLogData, + ensureArray, freezeDeep, logTruncate, randomBytesAsync, diff --git a/src/manager.js b/src/manager.js index fddc8e4..0f11068 100644 --- a/src/manager.js +++ b/src/manager.js @@ -144,7 +144,7 @@ class Manager { if (data.topic) { topic = await this.db.topicGetByUrl(dbCtx, data.topic); - if (!topic && this.options.manager.publicHub) { + if (!topic && this._newTopicCreationAllowed()) { this.logger.info(_scope, 'new topic from subscribe request', { data, requestId }); try { @@ -263,43 +263,165 @@ class Manager { /** - * Check that a publish request topic is valid and exists, - * and if it is, add topicId to data. - * For a public publish request, create topic if not exists. + * Determine if a topic url is allowed to be created. + * In the future, this may be more complicated. + * @returns {Boolean} + */ + _newTopicCreationAllowed() { + return this.options.manager.publicHub; + } + + + /** + * Check that a publish request's topic(s) are valid and exist, + * returning an array with the results for each. + * For a public-hub publish request, creates topics if they do not exist. * @param {*} dbCtx * @param {RootData} data * @param {String[]} warn * @param {String[]} err * @param {String} requestId */ - async _checkPublish(dbCtx, data, warn, err, requestId) { + async _publishTopics(dbCtx, data, requestId) { const _scope = _fileScope('_checkPublish'); - const publishUrl = data.url || data.topic; + // Publish requests may include multiple topics, consider them all, but deduplicate. + const publishUrls = Array.from(new Set([ + ...common.ensureArray(data.url), + ...common.ensureArray(data.topic), + ])); + + // Map the requested topics to their ids, creating if necessary. + return Promise.all(publishUrls.map(async (url) => { + const result = { + url, + warn: [], + err: [], + topicId: undefined, + }; + let topic = await this.db.topicGetByUrl(dbCtx, url); + if (!topic && this._newTopicCreationAllowed()) { + try { + new URL(url); + } catch (e) { + result.err.push('invalid topic url (failed to parse url)'); + return result; + } + await this.db.topicSet(dbCtx, { + // TODO: accept a publisherValidationUrl parameter + url, + }); + topic = await this.db.topicGetByUrl(dbCtx, url); + this.logger.info(_scope, 'new topic from publish request', { url, requestId }); + } + if (!topic || topic.isDeleted) { + result.err.push('topic not supported'); + return result; + } + result.topicId = topic.id; + return result; + })); + } - let topic = await this.db.topicGetByUrl(dbCtx, publishUrl); - if (!topic && this.options.manager.publicHub) { - this.logger.info(_scope, 'new topic from publish request', { data, requestId }); - try { - new URL(publishUrl); - } catch (e) { - err.push('invalid topic url (failed to parse url)'); - return; + /** + * Render response for multi-topic publish requests. + * @param {Object[]} publishTopics + */ + static multiPublishContent(ctx, publishTopics) { + const responses = publishTopics.map((topic) => ({ + href: topic.url, + status: topic.status, + statusMessage: topic.statusMessage, + errors: topic.err, + warnings: topic.warn, + })); + switch (ctx.responseType) { + case Enum.ContentType.ApplicationJson: + return JSON.stringify(responses); + + case Enum.ContentType.TextPlain: + default: { + const textResponses = responses.map((response) => { + const details = Manager._prettyDetails(response.errors, response.warnings); + const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n'); + return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`; + }); + return textResponses.join('\n----\n'); } + } + } + - await this.db.topicSet(dbCtx, { - url: publishUrl, - }); - topic = await this.db.topicGetByUrl(dbCtx, publishUrl); + /** + * Process a publish request. + * @param {*} dbCtx + * @param {Object} data + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async _publishRequest(dbCtx, data, res, ctx) { + const _scope = _fileScope('_parsePublish'); + this.logger.debug(_scope, 'called', { data }); + + const requestId = ctx.requestId; + + // Parse and validate all the topics in the request. + data.publishTopics = await this._publishTopics(dbCtx, data, requestId); + if (!data.publishTopics || !data.publishTopics.length) { + const details = Manager._prettyDetails(['no valid topic urls to publish'], []); + throw new ResponseError(Enum.ErrorResponse.BadRequest, details); } - if (!topic || topic.isDeleted) { - err.push('not a supported topic'); - return; + // Set status per topic + for (const topicResult of data.publishTopics) { + topicResult.status = topicResult.err.length ? 400 : 202; + topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted'; } - data.topicId = topic.id; + // Process the valid publish notifications + const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length); + try { + await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId))); + } catch (e) { + this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId }); + throw e; + } + + this.logger.info(_scope, 'request accepted', { ctx, data, requestId }); + + if (data.publishTopics.length === 1) { + const soleTopic = data.publishTopics[0]; + res.statusCode = soleTopic.status; + res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n')); + } else { + res.statusCode = 207; + res.end(Manager.multiPublishContent(ctx, data.publishTopics)); + } + + if (this.options.manager.processImmediately + && validPublishTopics.length) { + try { + await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId))); + } catch (e) { + this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId }); + // Don't bother re-throwing, as we've already ended this response. + } + } + } + + + /** + * Annotate any encountered issues. + * @param {String[]} err + * @param {String[]} warn + * @returns {String[]} + */ + static _prettyDetails(err, warn) { + return [ + ...err.map((entry) => `error: ${entry}`), + ...warn.map((entry) => `warning: ${entry}`), + ]; } @@ -322,15 +444,14 @@ class Manager { await this.db.context(async (dbCtx) => { + // Handle publish requests elsewhere if (data.mode === Enum.Mode.Publish) { - await this._checkPublish(dbCtx, data, warn, err, requestId); - } else { - await this._validateRootData(dbCtx, data, warn, err, requestId); + return this._publishRequest(dbCtx, data, res, ctx); } - const prettyErr = err.map((entry) => `error: ${entry}`); - const prettyWarn = warn.map((entry) => `warning: ${entry}`); - const details = prettyErr.concat(prettyWarn); + await this._validateRootData(dbCtx, data, warn, err, requestId); + + const details = Manager._prettyDetails(err, warn); // Any errors are fatal. Stop and report anything that went wrong. if (err.length) { @@ -339,18 +460,11 @@ class Manager { } // Commit the request for later processing. - let fn, info, id; + let id; try { - if (data.mode === Enum.Mode.Publish) { - fn = 'topicFetchRequested'; - info = await this.db.topicFetchRequested(dbCtx, data.topicId); - id = data.topicId; - } else { - fn = 'verificationInsert'; - id = await this.db.verificationInsert(dbCtx, { ...data, requestId }); - } + id = await this.db.verificationInsert(dbCtx, { ...data, requestId }); } catch (e) { - this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId }); + this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId }); throw e; } @@ -362,15 +476,9 @@ class Manager { if (this.options.manager.processImmediately && id) { try { - if (data.mode === Enum.Mode.Publish) { - fn = 'topicFetchClaimAndProcessById'; - await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId); - } else { - fn = 'verificationClaimAndProcessById'; - await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId); - } + await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId); } catch (e) { - this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId }); + this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId }); // Don't bother re-throwing, as we've already ended this response. } } diff --git a/test/src/common.js b/test/src/common.js index b64ecb4..a9b228c 100644 --- a/test/src/common.js +++ b/test/src/common.js @@ -160,4 +160,21 @@ describe('Common', function () { }); }); // validHash + describe('ensureArray', function () { + it('returns empty array for no data', function () { + const result = common.ensureArray(); + assert.deepStrictEqual(result, []); + }); + it('returns same array passed in', function () { + const expected = [1, 2, 3, 'foo']; + const result = common.ensureArray(expected); + assert.deepStrictEqual(result, expected); + }); + it('returns array containing non-array data', function () { + const data = 'bar'; + const result = common.ensureArray(data); + assert.deepStrictEqual(result, [data]); + }); + }); // ensureArray + }); // Common diff --git a/test/src/manager.js b/test/src/manager.js index 7870a55..89e3d65 100644 --- a/test/src/manager.js +++ b/test/src/manager.js @@ -37,6 +37,7 @@ describe('Manager', function () { manager = new Manager(stubLogger, stubDb, options); sinon.stub(manager.communication, 'verificationProcess'); sinon.stub(manager.communication, 'topicFetchProcess'); + sinon.stub(manager.communication, 'topicFetchClaimAndProcessById'); stubDb._reset(); stubLogger._reset(); }); @@ -174,7 +175,7 @@ describe('Manager', function () { await manager.getAdminOverview(res, ctx); assert(res.end.called); }); - }); + }); // getAdminOverview describe('getTopicDetails', function () { it('covers', async function() { @@ -559,13 +560,11 @@ describe('Manager', function () { }); }); // _checkMode - describe('_checkPublish', function () { - let dbCtx, data, warn, err, requestId; + describe('_publishTopics', function () { + let dbCtx, data, requestId; beforeEach(function () { dbCtx = {}; data = {}; - warn = []; - err = []; requestId = 'blah'; }); it('succeeds', async function () { @@ -573,26 +572,29 @@ describe('Manager', function () { id: 222, }); Object.assign(data, testData.validPublishRootData); - await manager._checkPublish(dbCtx, data, warn, err, requestId); - assert.strictEqual(warn.length, 0, 'unexpected warnings length'); - assert.strictEqual(err.length, 0, 'unexpected errors length'); - assert.strictEqual(data.topicId, 222, 'unexpected topic id'); + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 1); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id'); }); it('fails bad url', async function () { Object.assign(data, testData.validPublishRootData, { topic: 'not_a_url' }); - await manager._checkPublish(dbCtx, data, warn, err, requestId); - assert.strictEqual(err.length, 1, 'unexpected errors length'); - assert.strictEqual(warn.length, 0); + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 1); + assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length'); + assert.strictEqual(topicResults[0].warn.length, 0); }); it('accepts new public publish topic', async function () { manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({ id: 222, }); Object.assign(data, testData.validPublishRootData); - await manager._checkPublish(dbCtx, data, warn, err, requestId); - assert.strictEqual(warn.length, 0, 'unexpected warnings length'); - assert.strictEqual(err.length, 0, 'unexpected errors length'); - assert.strictEqual(data.topicId, 222, 'unexpected topic id'); + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 1); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id'); }); it('does not publish deleted topic', async function () { manager.db.topicGetByUrl.resolves({ @@ -600,12 +602,176 @@ describe('Manager', function () { isDeleted: true, }); Object.assign(data, testData.validPublishRootData); - await manager._checkPublish(dbCtx, data, warn, err, requestId); - assert.strictEqual(warn.length, 0, 'unexpected warnings length'); - assert.strictEqual(err.length, 1, 'unexpected errors length'); - assert.strictEqual(data.topicId, undefined, 'unexpected topic id'); + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 1); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, undefined, 'unexpected topic id'); + }); + it('no topics', async function() { + Object.assign(data, testData.validPublishRootData); + delete data.topic; + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 0); + }); + it('multiple valid topics', async function () { + manager.db.topicGetByUrl.resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + data.url = ['https://example.com/first', 'https://example.com/second']; + data.topic = ['https://example.com/third']; + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 3); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id'); + assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[1].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[1].topicId, 222, 'unexpected topic id'); + assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id'); + }); + it('mix of valid and invalid topics', async function () { + manager.db.topicGetByUrl.onCall(1).resolves().resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + data.url = ['https://example.com/first', 'not a url']; + data.topic = ['https://example.com/third']; + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 3); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id'); + assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[1].err.length, 1, 'unexpected errors length'); + assert.strictEqual(topicResults[1].topicId, undefined, 'unexpected topic id'); + assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id'); + }); + }); // _publishTopics + + describe('_publishRequest', function () { + let dbCtx, data, res, ctx; + beforeEach(function () { + dbCtx = {}; + data = {}; + res = { + end: sinon.stub(), + }; + ctx = {}; + }); + it('requires a topic', async function () { + try { + await manager._publishRequest(dbCtx, data, res, ctx); + assert.fail(noExpectedException); + } catch (e) { + assert(e instanceof Errors.ResponseError); + } + }); + it('processes one topic', async function() { + manager.db.topicGetByUrl.resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + manager.db.topicFetchRequested.resolves(); + await manager._publishRequest(dbCtx, data, res, ctx); + assert(manager.db.topicFetchRequested.called); + assert.strictEqual(res.statusCode, 202); + assert(res.end.called); + }); + it('processes mix of valid and invalid topics', async function () { + ctx.responseType = 'application/json'; + manager.db.topicGetByUrl.onCall(1).resolves().resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + data.url = ['https://example.com/first', 'not a url']; + data.topic = ['https://example.com/third']; + await manager._publishRequest(dbCtx, data, res, ctx); + assert.strictEqual(res.statusCode, 207); + assert(res.end.called); + }); + it('covers topicFetchRequest failure', async function () { + manager.db.topicGetByUrl.resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + const expected = new Error('boo'); + manager.db.topicFetchRequested.rejects(expected); + try { + await manager._publishRequest(dbCtx, data, res, ctx); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } }); - }); // _checkPublish + it('covers immediate processing error', async function() { + manager.options.manager.processImmediately = true; + manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({ + id: 222, + }); + manager.communication.topicFetchClaimAndProcessById.rejects(); + Object.assign(data, testData.validPublishRootData); + await manager._publishRequest(dbCtx, data, res, ctx); + assert(manager.db.topicFetchRequested.called); + assert.strictEqual(res.statusCode, 202); + assert(res.end.called); + assert(manager.communication.topicFetchClaimAndProcessById.called) + }); + it('covers no immediate processing', async function() { + manager.options.manager.processImmediately = false; + manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + await manager._publishRequest(dbCtx, data, res, ctx); + assert(manager.db.topicFetchRequested.called); + assert.strictEqual(res.statusCode, 202); + assert(res.end.called); + assert(!manager.communication.topicFetchClaimAndProcessById.called) + }); + }); // _publishRequest + + describe('multiPublishContent', function () { + let publishTopics; + beforeEach(function () { + publishTopics = [{ + url: 'https://example.com/first', + warn: [], + err: [], + topicId: 222, + status: 202, + statusMessage: 'Accepted', + }, + { + url: 'not a url', + warn: [], + err: [ 'invalid topic url (failed to parse url)' ], + topicId: undefined, + status: 400, + statusMessage: 'Bad Request', + }]; + }); + it('covers json response', function () { + ctx.responseType = 'application/json'; + const expected = '[{"href":"https://example.com/first","status":202,"statusMessage":"Accepted","errors":[],"warnings":[]},{"href":"not a url","status":400,"statusMessage":"Bad Request","errors":["invalid topic url (failed to parse url)"],"warnings":[]}]'; + const result = Manager.multiPublishContent(ctx, publishTopics); + assert.deepStrictEqual(result, expected); + }); + it('covers text response', function () { + ctx.responseType = 'text/plain'; + const expected = `https://example.com/first [202 Accepted] +---- +not a url [400 Bad Request] +\terror: invalid topic url (failed to parse url)`; + const result = Manager.multiPublishContent(ctx, publishTopics); + assert.deepStrictEqual(result, expected); + }); + }); // multiPublishContent describe('processTasks', function () { it('covers', async function () { -- 2.45.2