if (data.topic) {
topic = await this.db.topicGetByUrl(dbCtx, data.topic);
- if (!topic && this.options.manager.publicHub) {
+ if (!topic && this._newTopicCreationAllowed()) {
this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
try {
/**
- * Check that a publish request topic is valid and exists,
- * and if it is, add topicId to data.
- * For a public publish request, create topic if not exists.
+ * Determine if a topic url is allowed to be created.
+ * In the future, this may be more complicated.
+ * @returns {Boolean}
+ */
+ _newTopicCreationAllowed() {
+ return this.options.manager.publicHub;
+ }
+
+
+ /**
+ * Check that a publish request's topic(s) are valid and exist,
+ * returning an array with the results for each.
+ * For a public-hub publish request, creates topics if they do not exist.
* @param {*} dbCtx
* @param {RootData} data
* @param {String[]} warn
* @param {String[]} err
* @param {String} requestId
*/
- async _checkPublish(dbCtx, data, warn, err, requestId) {
+ async _publishTopics(dbCtx, data, requestId) {
const _scope = _fileScope('_checkPublish');
- const publishUrl = data.url || data.topic;
+ // Publish requests may include multiple topics, consider them all, but deduplicate.
+ const publishUrls = Array.from(new Set([
+ ...common.ensureArray(data.url),
+ ...common.ensureArray(data.topic),
+ ]));
+
+ // Map the requested topics to their ids, creating if necessary.
+ return Promise.all(publishUrls.map(async (url) => {
+ const result = {
+ url,
+ warn: [],
+ err: [],
+ topicId: undefined,
+ };
+ let topic = await this.db.topicGetByUrl(dbCtx, url);
+ if (!topic && this._newTopicCreationAllowed()) {
+ try {
+ new URL(url);
+ } catch (e) {
+ result.err.push('invalid topic url (failed to parse url)');
+ return result;
+ }
+ await this.db.topicSet(dbCtx, {
+ // TODO: accept a publisherValidationUrl parameter
+ url,
+ });
+ topic = await this.db.topicGetByUrl(dbCtx, url);
+ this.logger.info(_scope, 'new topic from publish request', { url, requestId });
+ }
+ if (!topic || topic.isDeleted) {
+ result.err.push('topic not supported');
+ return result;
+ }
+ result.topicId = topic.id;
+ return result;
+ }));
+ }
- let topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
- if (!topic && this.options.manager.publicHub) {
- this.logger.info(_scope, 'new topic from publish request', { data, requestId });
- try {
- new URL(publishUrl);
- } catch (e) {
- err.push('invalid topic url (failed to parse url)');
- return;
+ /**
+ * Render response for multi-topic publish requests.
+ * @param {Object[]} publishTopics
+ */
+ static multiPublishContent(ctx, publishTopics) {
+ const responses = publishTopics.map((topic) => ({
+ href: topic.url,
+ status: topic.status,
+ statusMessage: topic.statusMessage,
+ errors: topic.err,
+ warnings: topic.warn,
+ }));
+ switch (ctx.responseType) {
+ case Enum.ContentType.ApplicationJson:
+ return JSON.stringify(responses);
+
+ case Enum.ContentType.TextPlain:
+ default: {
+ const textResponses = responses.map((response) => {
+ const details = Manager._prettyDetails(response.errors, response.warnings);
+ const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
+ return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
+ });
+ return textResponses.join('\n----\n');
}
+ }
+ }
+
- await this.db.topicSet(dbCtx, {
- url: publishUrl,
- });
- topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
+ /**
+ * Process a publish request.
+ * @param {*} dbCtx
+ * @param {Object} data
+ * @param {http.ServerResponse} res
+ * @param {Object} ctx
+ */
+ async _publishRequest(dbCtx, data, res, ctx) {
+ const _scope = _fileScope('_parsePublish');
+ this.logger.debug(_scope, 'called', { data });
+
+ const requestId = ctx.requestId;
+
+ // Parse and validate all the topics in the request.
+ data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
+ if (!data.publishTopics || !data.publishTopics.length) {
+ const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
+ throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
}
- if (!topic || topic.isDeleted) {
- err.push('not a supported topic');
- return;
+ // Set status per topic
+ for (const topicResult of data.publishTopics) {
+ topicResult.status = topicResult.err.length ? 400 : 202;
+ topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
}
- data.topicId = topic.id;
+ // Process the valid publish notifications
+ const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
+ } catch (e) {
+ this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
+ throw e;
+ }
+
+ this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
+
+ if (data.publishTopics.length === 1) {
+ const soleTopic = data.publishTopics[0];
+ res.statusCode = soleTopic.status;
+ res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
+ } else {
+ res.statusCode = 207;
+ res.end(Manager.multiPublishContent(ctx, data.publishTopics));
+ }
+
+ if (this.options.manager.processImmediately
+ && validPublishTopics.length) {
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
+ } catch (e) {
+ this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
+ // Don't bother re-throwing, as we've already ended this response.
+ }
+ }
+ }
+
+
+ /**
+ * Annotate any encountered issues.
+ * @param {String[]} err
+ * @param {String[]} warn
+ * @returns {String[]}
+ */
+ static _prettyDetails(err, warn) {
+ return [
+ ...err.map((entry) => `error: ${entry}`),
+ ...warn.map((entry) => `warning: ${entry}`),
+ ];
}
await this.db.context(async (dbCtx) => {
+ // Handle publish requests elsewhere
if (data.mode === Enum.Mode.Publish) {
- await this._checkPublish(dbCtx, data, warn, err, requestId);
- } else {
- await this._validateRootData(dbCtx, data, warn, err, requestId);
+ return this._publishRequest(dbCtx, data, res, ctx);
}
- const prettyErr = err.map((entry) => `error: ${entry}`);
- const prettyWarn = warn.map((entry) => `warning: ${entry}`);
- const details = prettyErr.concat(prettyWarn);
+ await this._validateRootData(dbCtx, data, warn, err, requestId);
+
+ const details = Manager._prettyDetails(err, warn);
// Any errors are fatal. Stop and report anything that went wrong.
if (err.length) {
}
// Commit the request for later processing.
- let fn, info, id;
+ let id;
try {
- if (data.mode === Enum.Mode.Publish) {
- fn = 'topicFetchRequested';
- info = await this.db.topicFetchRequested(dbCtx, data.topicId);
- id = data.topicId;
- } else {
- fn = 'verificationInsert';
- id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
- }
+ id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
} catch (e) {
- this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId });
+ this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
throw e;
}
if (this.options.manager.processImmediately
&& id) {
try {
- if (data.mode === Enum.Mode.Publish) {
- fn = 'topicFetchClaimAndProcessById';
- await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId);
- } else {
- fn = 'verificationClaimAndProcessById';
- await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
- }
+ await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
} catch (e) {
- this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId });
+ this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
// Don't bother re-throwing, as we've already ended this response.
}
}
manager = new Manager(stubLogger, stubDb, options);
sinon.stub(manager.communication, 'verificationProcess');
sinon.stub(manager.communication, 'topicFetchProcess');
+ sinon.stub(manager.communication, 'topicFetchClaimAndProcessById');
stubDb._reset();
stubLogger._reset();
});
await manager.getAdminOverview(res, ctx);
assert(res.end.called);
});
- });
+ }); // getAdminOverview
describe('getTopicDetails', function () {
it('covers', async function() {
});
}); // _checkMode
- describe('_checkPublish', function () {
- let dbCtx, data, warn, err, requestId;
+ describe('_publishTopics', function () {
+ let dbCtx, data, requestId;
beforeEach(function () {
dbCtx = {};
data = {};
- warn = [];
- err = [];
requestId = 'blah';
});
it('succeeds', async function () {
id: 222,
});
Object.assign(data, testData.validPublishRootData);
- await manager._checkPublish(dbCtx, data, warn, err, requestId);
- assert.strictEqual(warn.length, 0, 'unexpected warnings length');
- assert.strictEqual(err.length, 0, 'unexpected errors length');
- assert.strictEqual(data.topicId, 222, 'unexpected topic id');
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 1);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
});
it('fails bad url', async function () {
Object.assign(data, testData.validPublishRootData, { topic: 'not_a_url' });
- await manager._checkPublish(dbCtx, data, warn, err, requestId);
- assert.strictEqual(err.length, 1, 'unexpected errors length');
- assert.strictEqual(warn.length, 0);
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 1);
+ assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].warn.length, 0);
});
it('accepts new public publish topic', async function () {
manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
id: 222,
});
Object.assign(data, testData.validPublishRootData);
- await manager._checkPublish(dbCtx, data, warn, err, requestId);
- assert.strictEqual(warn.length, 0, 'unexpected warnings length');
- assert.strictEqual(err.length, 0, 'unexpected errors length');
- assert.strictEqual(data.topicId, 222, 'unexpected topic id');
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 1);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
});
it('does not publish deleted topic', async function () {
manager.db.topicGetByUrl.resolves({
isDeleted: true,
});
Object.assign(data, testData.validPublishRootData);
- await manager._checkPublish(dbCtx, data, warn, err, requestId);
- assert.strictEqual(warn.length, 0, 'unexpected warnings length');
- assert.strictEqual(err.length, 1, 'unexpected errors length');
- assert.strictEqual(data.topicId, undefined, 'unexpected topic id');
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 1);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, undefined, 'unexpected topic id');
+ });
+ it('no topics', async function() {
+ Object.assign(data, testData.validPublishRootData);
+ delete data.topic;
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 0);
+ });
+ it('multiple valid topics', async function () {
+ manager.db.topicGetByUrl.resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ data.url = ['https://example.com/first', 'https://example.com/second'];
+ data.topic = ['https://example.com/third'];
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 3);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
+ assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[1].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[1].topicId, 222, 'unexpected topic id');
+ assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id');
+ });
+ it('mix of valid and invalid topics', async function () {
+ manager.db.topicGetByUrl.onCall(1).resolves().resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ data.url = ['https://example.com/first', 'not a url'];
+ data.topic = ['https://example.com/third'];
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 3);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
+ assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[1].err.length, 1, 'unexpected errors length');
+ assert.strictEqual(topicResults[1].topicId, undefined, 'unexpected topic id');
+ assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id');
+ });
+ }); // _publishTopics
+
+ describe('_publishRequest', function () {
+ let dbCtx, data, res, ctx;
+ beforeEach(function () {
+ dbCtx = {};
+ data = {};
+ res = {
+ end: sinon.stub(),
+ };
+ ctx = {};
+ });
+ it('requires a topic', async function () {
+ try {
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert(e instanceof Errors.ResponseError);
+ }
+ });
+ it('processes one topic', async function() {
+ manager.db.topicGetByUrl.resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ manager.db.topicFetchRequested.resolves();
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert(manager.db.topicFetchRequested.called);
+ assert.strictEqual(res.statusCode, 202);
+ assert(res.end.called);
+ });
+ it('processes mix of valid and invalid topics', async function () {
+ ctx.responseType = 'application/json';
+ manager.db.topicGetByUrl.onCall(1).resolves().resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ data.url = ['https://example.com/first', 'not a url'];
+ data.topic = ['https://example.com/third'];
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert.strictEqual(res.statusCode, 207);
+ assert(res.end.called);
+ });
+ it('covers topicFetchRequest failure', async function () {
+ manager.db.topicGetByUrl.resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ const expected = new Error('boo');
+ manager.db.topicFetchRequested.rejects(expected);
+ try {
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
});
- }); // _checkPublish
+ it('covers immediate processing error', async function() {
+ manager.options.manager.processImmediately = true;
+ manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
+ id: 222,
+ });
+ manager.communication.topicFetchClaimAndProcessById.rejects();
+ Object.assign(data, testData.validPublishRootData);
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert(manager.db.topicFetchRequested.called);
+ assert.strictEqual(res.statusCode, 202);
+ assert(res.end.called);
+ assert(manager.communication.topicFetchClaimAndProcessById.called)
+ });
+ it('covers no immediate processing', async function() {
+ manager.options.manager.processImmediately = false;
+ manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert(manager.db.topicFetchRequested.called);
+ assert.strictEqual(res.statusCode, 202);
+ assert(res.end.called);
+ assert(!manager.communication.topicFetchClaimAndProcessById.called)
+ });
+ }); // _publishRequest
+
+ describe('multiPublishContent', function () {
+ let publishTopics;
+ beforeEach(function () {
+ publishTopics = [{
+ url: 'https://example.com/first',
+ warn: [],
+ err: [],
+ topicId: 222,
+ status: 202,
+ statusMessage: 'Accepted',
+ },
+ {
+ url: 'not a url',
+ warn: [],
+ err: [ 'invalid topic url (failed to parse url)' ],
+ topicId: undefined,
+ status: 400,
+ statusMessage: 'Bad Request',
+ }];
+ });
+ it('covers json response', function () {
+ ctx.responseType = 'application/json';
+ const expected = '[{"href":"https://example.com/first","status":202,"statusMessage":"Accepted","errors":[],"warnings":[]},{"href":"not a url","status":400,"statusMessage":"Bad Request","errors":["invalid topic url (failed to parse url)"],"warnings":[]}]';
+ const result = Manager.multiPublishContent(ctx, publishTopics);
+ assert.deepStrictEqual(result, expected);
+ });
+ it('covers text response', function () {
+ ctx.responseType = 'text/plain';
+ const expected = `https://example.com/first [202 Accepted]
+----
+not a url [400 Bad Request]
+\terror: invalid topic url (failed to parse url)`;
+ const result = Manager.multiPublishContent(ctx, publishTopics);
+ assert.deepStrictEqual(result, expected);
+ });
+ }); // multiPublishContent
describe('processTasks', function () {
it('covers', async function () {