/**
* GET request for root.
+ * @param {http.ClientRequest} req
* @param {http.ServerResponse} res
* @param {object} ctx
*/
if (data.topic) {
topic = await this.db.topicGetByUrl(dbCtx, data.topic);
- if (!topic && this.options.manager.publicHub) {
+ if (!topic && this._newTopicCreationAllowed()) {
this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
try {
/**
- * Check that a publish request topic is valid and exists,
- * and if it is, add topicId to data.
- * For a public publish request, create topic if not exists.
+ * Determine if a topic url is allowed to be created.
+ * In the future, this may be more complicated.
+ * @returns {Boolean}
+ */
+ _newTopicCreationAllowed() {
+ return this.options.manager.publicHub;
+ }
+
+
+ /**
+ * Check that a publish request's topic(s) are valid and exist,
+ * returning an array with the results for each.
+ * For a public-hub publish request, creates topics if they do not exist.
* @param {*} dbCtx
* @param {RootData} data
* @param {String[]} warn
* @param {String[]} err
* @param {String} requestId
*/
- async _checkPublish(dbCtx, data, warn, err, requestId) {
+ async _publishTopics(dbCtx, data, requestId) {
const _scope = _fileScope('_checkPublish');
- const publishUrl = data.url || data.topic;
+ // Publish requests may include multiple topics, consider them all, but deduplicate.
+ const publishUrls = Array.from(new Set([
+ ...common.ensureArray(data.url),
+ ...common.ensureArray(data.topic),
+ ]));
+
+ // Map the requested topics to their ids, creating if necessary.
+ return Promise.all(publishUrls.map(async (url) => {
+ const result = {
+ url,
+ warn: [],
+ err: [],
+ topicId: undefined,
+ };
+ let topic = await this.db.topicGetByUrl(dbCtx, url);
+ if (!topic && this._newTopicCreationAllowed()) {
+ try {
+ new URL(url);
+ } catch (e) {
+ result.err.push('invalid topic url (failed to parse url)');
+ return result;
+ }
+ await this.db.topicSet(dbCtx, {
+ // TODO: accept a publisherValidationUrl parameter
+ url,
+ });
+ topic = await this.db.topicGetByUrl(dbCtx, url);
+ this.logger.info(_scope, 'new topic from publish request', { url, requestId });
+ }
+ if (!topic || topic.isDeleted) {
+ result.err.push('topic not supported');
+ return result;
+ }
+ result.topicId = topic.id;
+ return result;
+ }));
+ }
- let topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
- if (!topic && this.options.manager.publicHub) {
- this.logger.info(_scope, 'new topic from publish request', { data, requestId });
- try {
- new URL(publishUrl);
- } catch (e) {
- err.push('invalid topic url (failed to parse url)');
- return;
+ /**
+ * Render response for multi-topic publish requests.
+ * @param {Object[]} publishTopics
+ */
+ static multiPublishContent(ctx, publishTopics) {
+ const responses = publishTopics.map((topic) => ({
+ href: topic.url,
+ status: topic.status,
+ statusMessage: topic.statusMessage,
+ errors: topic.err,
+ warnings: topic.warn,
+ }));
+ switch (ctx.responseType) {
+ case Enum.ContentType.ApplicationJson:
+ return JSON.stringify(responses);
+
+ case Enum.ContentType.TextPlain:
+ default: {
+ const textResponses = responses.map((response) => {
+ const details = Manager._prettyDetails(response.errors, response.warnings);
+ const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
+ return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
+ });
+ return textResponses.join('\n----\n');
}
+ }
+ }
- await this.db.topicSet(dbCtx, {
- url: publishUrl,
- });
- topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
+
+ /**
+ * Process a publish request.
+ * @param {*} dbCtx
+ * @param {Object} data
+ * @param {http.ServerResponse} res
+ * @param {Object} ctx
+ */
+ async _publishRequest(dbCtx, data, res, ctx) {
+ const _scope = _fileScope('_parsePublish');
+ this.logger.debug(_scope, 'called', { data });
+
+ const requestId = ctx.requestId;
+
+ // Parse and validate all the topics in the request.
+ data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
+ if (!data.publishTopics || !data.publishTopics.length) {
+ const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
+ throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
}
- if (!topic || topic.isDeleted) {
- err.push('not a supported topic');
- return;
+ // Set status per topic
+ for (const topicResult of data.publishTopics) {
+ topicResult.status = topicResult.err.length ? 400 : 202;
+ topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
}
- data.topicId = topic.id;
+ // Process the valid publish notifications
+ const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
+ } catch (e) {
+ this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
+ throw e;
+ }
+
+ this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
+
+ if (data.publishTopics.length === 1) {
+ const soleTopic = data.publishTopics[0];
+ res.statusCode = soleTopic.status;
+ res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
+ } else {
+ res.statusCode = 207;
+ res.end(Manager.multiPublishContent(ctx, data.publishTopics));
+ }
+
+ if (this.options.manager.processImmediately
+ && validPublishTopics.length) {
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
+ } catch (e) {
+ this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
+ // Don't bother re-throwing, as we've already ended this response.
+ }
+ }
+ }
+
+
+ /**
+ * Annotate any encountered issues.
+ * @param {String[]} err
+ * @param {String[]} warn
+ * @returns {String[]}
+ */
+ static _prettyDetails(err, warn) {
+ return [
+ ...err.map((entry) => `error: ${entry}`),
+ ...warn.map((entry) => `warning: ${entry}`),
+ ];
}
await this.db.context(async (dbCtx) => {
+ // Handle publish requests elsewhere
if (data.mode === Enum.Mode.Publish) {
- await this._checkPublish(dbCtx, data, warn, err, requestId);
- } else {
- await this._validateRootData(dbCtx, data, warn, err, requestId);
+ return this._publishRequest(dbCtx, data, res, ctx);
}
- const prettyErr = err.map((entry) => `error: ${entry}`);
- const prettyWarn = warn.map((entry) => `warning: ${entry}`);
- const details = prettyErr.concat(prettyWarn);
+ await this._validateRootData(dbCtx, data, warn, err, requestId);
+
+ const details = Manager._prettyDetails(err, warn);
// Any errors are fatal. Stop and report anything that went wrong.
if (err.length) {
}
// Commit the request for later processing.
- let fn, info, id;
+ let id;
try {
- if (data.mode === Enum.Mode.Publish) {
- fn = 'topicFetchRequested';
- info = await this.db.topicFetchRequested(dbCtx, data.topicId);
- id = data.topicId;
- } else {
- fn = 'verificationInsert';
- id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
- }
+ id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
} catch (e) {
- this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId });
+ this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
throw e;
}
if (this.options.manager.processImmediately
&& id) {
try {
- if (data.mode === Enum.Mode.Publish) {
- fn = 'topicFetchClaimAndProcessById';
- await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId);
- } else {
- fn = 'verificationClaimAndProcessById';
- await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
- }
+ await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
} catch (e) {
- this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId });
+ this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
// Don't bother re-throwing, as we've already ended this response.
}
}
});
this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
+ // Profile users can only see related topics.
+ if (ctx.session && ctx.session.authenticatedProfile) {
+ const profileUrlObj = new URL(ctx.session.authenticatedProfile);
+ ctx.topics = ctx.topics.filter((topic) => {
+ const topicUrlObj = new URL(topic.url);
+ return (topicUrlObj.hostname === profileUrlObj.hostname);
+ });
+ }
+
res.end(Template.adminOverviewHTML(ctx, this.options));
this.logger.info(_scope, 'finished', { ...ctx, topics: ctx.topics.length })
}
});
this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions });
+ // Profile users can only see related topics.
+ if (ctx.session && ctx.session.authenticatedProfile) {
+ const profileUrlObj = new URL(ctx.session.authenticatedProfile);
+ const topicUrlObj = new URL(ctx.topic.url);
+ if (topicUrlObj.hostname !== profileUrlObj.hostname) {
+ ctx.topic = null;
+ ctx.subscriptions = [];
+ }
+ }
+
res.end(Template.adminTopicDetailsHTML(ctx, this.options));
- this.logger.info(_scope, 'finished', { ...ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic.id });
+ this.logger.info(_scope, 'finished', { ...ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic && ctx.topic.id || ctx.topic });
}
await this.db.topicDeleted(txCtx, topicId);
res.end();
this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
+ // Attempt to remove from db if no active subscriptions.
+ await this.db.topicPendingDelete(txCtx, topicId);
return;
}
this.logger.debug(_scope, 'called', { ctx });
// N.B. no await on this
- this.communication.worker.process();
+ this.communication.worker.process().catch((e) => {
+ this.logger.error(_scope, 'failed', { error: e, ctx });
+ });
res.end();
this.logger.info(_scope, 'invoked worker process', { ctx });