X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=src%2Fmanager.js;h=0f110682eb2b3f94ddd1fe0076dab22e620ce69a;hb=cab7ebc31583981d0c235039afdfc9d63e730f02;hp=6fb187cde3dc0da5854727ffbc9297199a752899;hpb=28de4364128a4b03918a8cbe868009b5d427220a;p=websub-hub diff --git a/src/manager.js b/src/manager.js index 6fb187c..0f11068 100644 --- a/src/manager.js +++ b/src/manager.js @@ -144,7 +144,7 @@ class Manager { if (data.topic) { topic = await this.db.topicGetByUrl(dbCtx, data.topic); - if (!topic && this.options.manager.publicHub) { + if (!topic && this._newTopicCreationAllowed()) { this.logger.info(_scope, 'new topic from subscribe request', { data, requestId }); try { @@ -263,43 +263,165 @@ class Manager { /** - * Check that a publish request topic is valid and exists, - * and if it is, add topicId to data. - * For a public publish request, create topic if not exists. + * Determine if a topic url is allowed to be created. + * In the future, this may be more complicated. + * @returns {Boolean} + */ + _newTopicCreationAllowed() { + return this.options.manager.publicHub; + } + + + /** + * Check that a publish request's topic(s) are valid and exist, + * returning an array with the results for each. + * For a public-hub publish request, creates topics if they do not exist. * @param {*} dbCtx * @param {RootData} data * @param {String[]} warn * @param {String[]} err * @param {String} requestId */ - async _checkPublish(dbCtx, data, warn, err, requestId) { + async _publishTopics(dbCtx, data, requestId) { const _scope = _fileScope('_checkPublish'); - const publishUrl = data.url || data.topic; + // Publish requests may include multiple topics, consider them all, but deduplicate. + const publishUrls = Array.from(new Set([ + ...common.ensureArray(data.url), + ...common.ensureArray(data.topic), + ])); + + // Map the requested topics to their ids, creating if necessary. + return Promise.all(publishUrls.map(async (url) => { + const result = { + url, + warn: [], + err: [], + topicId: undefined, + }; + let topic = await this.db.topicGetByUrl(dbCtx, url); + if (!topic && this._newTopicCreationAllowed()) { + try { + new URL(url); + } catch (e) { + result.err.push('invalid topic url (failed to parse url)'); + return result; + } + await this.db.topicSet(dbCtx, { + // TODO: accept a publisherValidationUrl parameter + url, + }); + topic = await this.db.topicGetByUrl(dbCtx, url); + this.logger.info(_scope, 'new topic from publish request', { url, requestId }); + } + if (!topic || topic.isDeleted) { + result.err.push('topic not supported'); + return result; + } + result.topicId = topic.id; + return result; + })); + } - let topic = await this.db.topicGetByUrl(dbCtx, publishUrl); - if (!topic && this.options.manager.publicHub) { - this.logger.info(_scope, 'new topic from publish request', { data, requestId }); - try { - new URL(publishUrl); - } catch (e) { - err.push('invalid topic url (failed to parse url)'); - return; + /** + * Render response for multi-topic publish requests. + * @param {Object[]} publishTopics + */ + static multiPublishContent(ctx, publishTopics) { + const responses = publishTopics.map((topic) => ({ + href: topic.url, + status: topic.status, + statusMessage: topic.statusMessage, + errors: topic.err, + warnings: topic.warn, + })); + switch (ctx.responseType) { + case Enum.ContentType.ApplicationJson: + return JSON.stringify(responses); + + case Enum.ContentType.TextPlain: + default: { + const textResponses = responses.map((response) => { + const details = Manager._prettyDetails(response.errors, response.warnings); + const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n'); + return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`; + }); + return textResponses.join('\n----\n'); } + } + } + + + /** + * Process a publish request. + * @param {*} dbCtx + * @param {Object} data + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async _publishRequest(dbCtx, data, res, ctx) { + const _scope = _fileScope('_parsePublish'); + this.logger.debug(_scope, 'called', { data }); + + const requestId = ctx.requestId; - await this.db.topicSet(dbCtx, { - url: publishUrl, - }); - topic = await this.db.topicGetByUrl(dbCtx, publishUrl); + // Parse and validate all the topics in the request. + data.publishTopics = await this._publishTopics(dbCtx, data, requestId); + if (!data.publishTopics || !data.publishTopics.length) { + const details = Manager._prettyDetails(['no valid topic urls to publish'], []); + throw new ResponseError(Enum.ErrorResponse.BadRequest, details); } - if (!topic || topic.isDeleted) { - err.push('not a supported topic'); - return; + // Set status per topic + for (const topicResult of data.publishTopics) { + topicResult.status = topicResult.err.length ? 400 : 202; + topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted'; } - data.topicId = topic.id; + // Process the valid publish notifications + const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length); + try { + await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId))); + } catch (e) { + this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId }); + throw e; + } + + this.logger.info(_scope, 'request accepted', { ctx, data, requestId }); + + if (data.publishTopics.length === 1) { + const soleTopic = data.publishTopics[0]; + res.statusCode = soleTopic.status; + res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n')); + } else { + res.statusCode = 207; + res.end(Manager.multiPublishContent(ctx, data.publishTopics)); + } + + if (this.options.manager.processImmediately + && validPublishTopics.length) { + try { + await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId))); + } catch (e) { + this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId }); + // Don't bother re-throwing, as we've already ended this response. + } + } + } + + + /** + * Annotate any encountered issues. + * @param {String[]} err + * @param {String[]} warn + * @returns {String[]} + */ + static _prettyDetails(err, warn) { + return [ + ...err.map((entry) => `error: ${entry}`), + ...warn.map((entry) => `warning: ${entry}`), + ]; } @@ -322,15 +444,14 @@ class Manager { await this.db.context(async (dbCtx) => { + // Handle publish requests elsewhere if (data.mode === Enum.Mode.Publish) { - await this._checkPublish(dbCtx, data, warn, err, requestId); - } else { - await this._validateRootData(dbCtx, data, warn, err, requestId); + return this._publishRequest(dbCtx, data, res, ctx); } - const prettyErr = err.map((entry) => `error: ${entry}`); - const prettyWarn = warn.map((entry) => `warning: ${entry}`); - const details = prettyErr.concat(prettyWarn); + await this._validateRootData(dbCtx, data, warn, err, requestId); + + const details = Manager._prettyDetails(err, warn); // Any errors are fatal. Stop and report anything that went wrong. if (err.length) { @@ -339,18 +460,11 @@ class Manager { } // Commit the request for later processing. - let fn, info, id; + let id; try { - if (data.mode === Enum.Mode.Publish) { - fn = 'topicPublish'; - info = await this.db.topicFetchRequested(dbCtx, data.topicId); - id = data.topicId; - } else { - fn = 'verificationInsert'; - id = await this.db.verificationInsert(dbCtx, { ...data, requestId }); - } + id = await this.db.verificationInsert(dbCtx, { ...data, requestId }); } catch (e) { - this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId }); + this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId }); throw e; } @@ -362,15 +476,9 @@ class Manager { if (this.options.manager.processImmediately && id) { try { - if (data.mode === Enum.Mode.Publish) { - fn = 'topicFetchClaimAndProcessById'; - await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId); - } else { - fn = 'processVerification'; - await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId); - } + await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId); } catch (e) { - this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId }); + this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId }); // Don't bother re-throwing, as we've already ended this response. } } @@ -666,7 +774,9 @@ class Manager { this.logger.debug(_scope, 'called', { ctx }); // N.B. no await on this - this.communication.worker.process(); + this.communication.worker.process().catch((e) => { + this.logger.error(_scope, 'failed', { error: e, ctx }); + }); res.end(); this.logger.info(_scope, 'invoked worker process', { ctx });