this.db = db;
this.options = options;
this.communication = new Communication(logger, db, options);
-
- // Precalculate the invariant root GET metadata.
- this.getRootContent = Template.rootHTML(undefined, options);
- const now = new Date();
- this.startTimeString = now.toGMTString();
- this.startTimeMs = now.getTime();
- this.getRootETag = common.generateETag(undefined, undefined, this.getRootContent);
}
+ /**
+ * @typedef {import('node:http')} http
+ */
/**
* GET request for healthcheck.
- * @param {http.ServerResponse} res
- * @param {object} ctx
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
*/
async getHealthcheck(res, ctx) {
const _scope = _fileScope('getHealthcheck');
/**
* GET request for root.
- * @param {http.ServerResponse} res
- * @param {object} ctx
+ * @param {http.ClientRequest} req request
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
*/
async getRoot(req, res, ctx) {
const _scope = _fileScope('getRoot');
this.logger.debug(_scope, 'called', { ctx });
- res.setHeader(Enum.Header.LastModified, this.startTimeString);
- res.setHeader(Enum.Header.ETag, this.getRootETag);
-
- if (common.isClientCached(req, this.startTimeMs, this.getRootETag)) {
- this.logger.debug(_scope, 'client cached response', { ctx });
- res.statusCode = 304;
- res.end();
- return;
- }
- res.end(this.getRootContent);
+ const content = Template.rootHTML(ctx, this.options);
+ res.end(content);
this.logger.info(_scope, 'finished', { ctx });
}
- /** All the fields the root handler deals with.
+ /**
+ * All the fields the root handler deals with.
* @typedef {object} RootData
- * @property {string} callback - url
- * @property {string} mode
- * @property {string} topic
- * @property {number} topicId
- * @property {string} leaseSeconds
- * @property {string} secret
- * @property {string} httpRemoteAddr
- * @property {string} httpFrom
- * @property {boolean} isSecure
- * @property {boolean} isPublisherValidated
+ * @property {string} callback url
+ * @property {string} mode mode
+ * @property {string} topic topic
+ * @property {number} topicId topic id
+ * @property {string} leaseSeconds lease seconds
+ * @property {string} secret secret
+ * @property {string} httpRemoteAddr remote address
+ * @property {string} httpFrom from
+ * @property {boolean} isSecure is secure
+ * @property {boolean} isPublisherValidated is published validated
*/
/**
* Extract api parameters.
- * @param {http.ClientRequest} req
- * @param {Object} ctx
- * @returns {RootData}
+ * @param {http.ClientRequest} req request
+ * @param {object} ctx context
+ * @returns {RootData} root data
*/
static _getRootData(req, ctx) {
const postData = ctx.parsedBody;
/**
*
- * @param {*} dbCtx
- * @param {RootData} data
- * @param {String[]} warn
- * @param {String[]} err
- * @param {String} requestId
+ * @param {*} dbCtx db context
+ * @param {RootData} data root data
+ * @param {string[]} warn warnings
+ * @param {string[]} err errors
+ * @param {string} requestId request id
+ * @returns {Promise<void>}
*/
async _validateRootData(dbCtx, data, warn, err, requestId) {
// These checks can modify data, so order matters.
* Check that requested topic exists and values are in range.
* Sets topic id, publisher validation state, and requested lease
* seconds on data.
- * @param {*} dbCtx
- * @param {RootData} data
- * @param {String[]} warn
- * @param {String[]} err
+ * @param {*} dbCtx db context
+ * @param {RootData} data root data
+ * @param {string[]} warn warnings
+ * @param {string[]} err errors
+ * @param {string} requestId request id
+ * @returns {Promise<void>}
*/
async _checkTopic(dbCtx, data, warn, err, requestId) {
const _scope = _fileScope('_checkTopic');
if (data.topic) {
topic = await this.db.topicGetByUrl(dbCtx, data.topic);
- if (!topic && this.options.manager.publicHub) {
+ if (!topic && this._newTopicCreationAllowed()) {
this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
try {
new URL(data.topic);
- } catch (e) {
+ } catch (e) { // eslint-disable-line no-unused-vars
err.push('invalid topic url (failed to parse url)');
return;
}
if (data.leaseSeconds === undefined || isNaN(data.leaseSeconds)) {
data.leaseSeconds = topic.leaseSecondsPreferred;
- } else {
- if (data.leaseSeconds > topic.leaseSecondsMax) {
- data.leaseSeconds = topic.leaseSecondsMax;
- warn.push(`requested lease too long, using ${data.leaseSeconds}`);
- } else if (data.leaseSeconds < topic.leaseSecondsMin) {
- data.leaseSeconds = topic.leaseSecondsMin;
- warn.push(`requested lease too short, using ${data.leaseSeconds}`);
- }
+ } else if (data.leaseSeconds > topic.leaseSecondsMax) {
+ data.leaseSeconds = topic.leaseSecondsMax;
+ warn.push(`requested lease too long, using ${data.leaseSeconds}`);
+ } else if (data.leaseSeconds < topic.leaseSecondsMin) {
+ data.leaseSeconds = topic.leaseSecondsMin;
+ warn.push(`requested lease too short, using ${data.leaseSeconds}`);
}
if (topic.publisherValidationUrl) {
/**
* Check data for valid callback url and scheme constraints.
- * @param {RootData} data
- * @param {String[]} warn
- * @param {String[]} err
+ * @param {RootData} data root data
+ * @param {string[]} warn warnings
+ * @param {string[]} err errors
*/
_checkCallbackAndSecrets(data, warn, err) {
let isCallbackSecure = false;
try {
const c = new URL(data.callback);
isCallbackSecure = (c.protocol.toLowerCase() === 'https:'); // Colon included because url module is weird
- } catch (e) {
+ } catch (e) { // eslint-disable-line no-unused-vars
err.push('invalid callback url (failed to parse url');
return;
}
/**
* Check mode validity and subscription requirements.
* Publish mode is handled elsewhere in the flow.
- * @param {*} dbCtx
- * @param {RootData} data
- * @param {String[]} warn
- * @param {String[]} err
- * @param {String} requestId
+ * @param {*} dbCtx db context
+ * @param {RootData} data root data
+ * @param {string[]} warn warnings
+ * @param {string[]} err errors
+ * @returns {Promise<void>}
*/
async _checkMode(dbCtx, data, warn, err) {
switch (data.mode) {
}
if (s === undefined) {
err.push('not subscribed');
- } else {
- if (s.expires < currentEpoch) {
- err.push('subscription already expired');
- }
+ } else if (s.expires < currentEpoch) {
+ err.push('subscription already expired');
}
+
break;
}
/**
- * Check that a publish request topic is valid and exists,
- * and if it is, add topicId to data.
- * For a public publish request, create topic if not exists.
- * @param {*} dbCtx
- * @param {RootData} data
- * @param {String[]} warn
- * @param {String[]} err
- * @param {String} requestId
+ * Determine if a topic url is allowed to be created.
+ * In the future, this may be more complicated.
+ * @returns {boolean} is public hub
+ */
+ _newTopicCreationAllowed() {
+ return this.options.manager.publicHub;
+ }
+
+
+ /**
+ * Check that a publish request's topic(s) are valid and exist,
+ * returning an array with the results for each.
+ * For a public-hub publish request, creates topics if they do not exist.
+ * @param {*} dbCtx db context
+ * @param {RootData} data root data
+ * @param {string} requestId request id
+ * @returns {Promise<object[]>} results
*/
- async _checkPublish(dbCtx, data, warn, err, requestId) {
+ async _publishTopics(dbCtx, data, requestId) {
const _scope = _fileScope('_checkPublish');
- const publishUrl = data.url || data.topic;
+ // Publish requests may include multiple topics, consider them all, but deduplicate.
+ const publishUrls = Array.from(new Set([
+ ...common.ensureArray(data.url),
+ ...common.ensureArray(data.topic),
+ ]));
+
+ // Map the requested topics to their ids, creating if necessary.
+ return Promise.all(publishUrls.map(async (url) => {
+ const result = {
+ url,
+ warn: [],
+ err: [],
+ topicId: undefined,
+ };
+ let topic = await this.db.topicGetByUrl(dbCtx, url);
+ if (!topic && this._newTopicCreationAllowed()) {
+ try {
+ new URL(url);
+ } catch (e) { // eslint-disable-line no-unused-vars
+ result.err.push('invalid topic url (failed to parse url)');
+ return result;
+ }
+ await this.db.topicSet(dbCtx, {
+ // TODO: accept a publisherValidationUrl parameter
+ url,
+ });
+ topic = await this.db.topicGetByUrl(dbCtx, url);
+ this.logger.info(_scope, 'new topic from publish request', { url, requestId });
+ }
+ if (!topic || topic.isDeleted) {
+ result.err.push('topic not supported');
+ return result;
+ }
+ result.topicId = topic.id;
+ return result;
+ }));
+ }
- let topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
- if (!topic && this.options.manager.publicHub) {
- this.logger.info(_scope, 'new topic from publish request', { data, requestId });
- try {
- new URL(publishUrl);
- } catch (e) {
- err.push('invalid topic url (failed to parse url)');
- return;
+ /**
+ * Render response for multi-topic publish requests.
+ * @param {object} ctx context
+ * @param {object[]} publishTopics topics
+ * @returns {string} response content
+ */
+ static multiPublishContent(ctx, publishTopics) {
+ const responses = publishTopics.map((topic) => ({
+ href: topic.url,
+ status: topic.status,
+ statusMessage: topic.statusMessage,
+ errors: topic.err,
+ warnings: topic.warn,
+ }));
+ switch (ctx.responseType) {
+ case Enum.ContentType.ApplicationJson:
+ return JSON.stringify(responses);
+
+ case Enum.ContentType.TextPlain:
+ default: {
+ const textResponses = responses.map((response) => {
+ const details = Manager._prettyDetails(response.errors, response.warnings);
+ const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
+ return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
+ });
+ return textResponses.join('\n----\n');
}
+ }
+ }
- await this.db.topicSet(dbCtx, {
- url: publishUrl,
- });
- topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
+
+ /**
+ * Process a publish request.
+ * @param {*} dbCtx db context
+ * @param {object} data data
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
+ */
+ async _publishRequest(dbCtx, data, res, ctx) {
+ const _scope = _fileScope('_parsePublish');
+ this.logger.debug(_scope, 'called', { data });
+
+ const requestId = ctx.requestId;
+
+ // Parse and validate all the topics in the request.
+ data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
+ if (!data?.publishTopics?.length) {
+ const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
+ throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
}
- if (!topic || topic.isDeleted) {
- err.push('not a supported topic');
- return;
+ // Set status per topic
+ for (const topicResult of data.publishTopics) {
+ topicResult.status = topicResult.err.length ? 400 : 202;
+ topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
}
- data.topicId = topic.id;
+ // Process the valid publish notifications
+ const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
+ } catch (e) {
+ this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
+ throw e;
+ }
+
+ this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
+
+ if (data.publishTopics.length === 1) {
+ const soleTopic = data.publishTopics[0];
+ res.statusCode = soleTopic.status;
+ res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
+ } else {
+ res.statusCode = 207;
+ res.end(Manager.multiPublishContent(ctx, data.publishTopics));
+ }
+
+ if (this.options.manager.processImmediately
+ && validPublishTopics.length) {
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
+ } catch (e) { // eslint-disable-line no-unused-vars
+ this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
+ // Don't bother re-throwing, as we've already ended this response.
+ }
+ }
+ }
+
+
+ /**
+ * Annotate any encountered issues.
+ * @param {string[]} err errors
+ * @param {string[]} warn warnings
+ * @returns {string[]} rendered list of errors and warnings
+ */
+ static _prettyDetails(err, warn) {
+ return [
+ ...err.map((entry) => `error: ${entry}`),
+ ...warn.map((entry) => `warning: ${entry}`),
+ ];
}
/**
* POST request for root.
- * @param {http.ClientRequest} req
- * @param {http.ServerResponse} res
- * @param {object} ctx
+ * @param {http.ClientRequest} req request
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
*/
async postRoot(req, res, ctx) {
const _scope = _fileScope('postRoot');
await this.db.context(async (dbCtx) => {
+ // Handle publish requests elsewhere
if (data.mode === Enum.Mode.Publish) {
- await this._checkPublish(dbCtx, data, warn, err, requestId);
- } else {
- await this._validateRootData(dbCtx, data, warn, err, requestId);
+ return this._publishRequest(dbCtx, data, res, ctx);
}
- const prettyErr = err.map((entry) => `error: ${entry}`);
- const prettyWarn = warn.map((entry) => `warning: ${entry}`);
- const details = prettyErr.concat(prettyWarn);
+ await this._validateRootData(dbCtx, data, warn, err, requestId);
+
+ const details = Manager._prettyDetails(err, warn);
// Any errors are fatal. Stop and report anything that went wrong.
if (err.length) {
}
// Commit the request for later processing.
- let fn, info, id;
+ let id;
try {
- if (data.mode === Enum.Mode.Publish) {
- fn = 'topicPublish';
- info = await this.db.topicFetchRequested(dbCtx, data.topicId);
- id = data.topicId;
- } else {
- fn = 'verificationInsert';
- id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
- }
+ id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
} catch (e) {
- this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId });
+ this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
throw e;
}
if (this.options.manager.processImmediately
&& id) {
try {
- if (data.mode === Enum.Mode.Publish) {
- fn = 'topicFetchClaimAndProcessById';
- await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId);
- } else {
- fn = 'processVerification';
- await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
- }
- } catch (e) {
- this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId });
+ await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
+ } catch (e) { // eslint-disable-line no-unused-vars
+ this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
// Don't bother re-throwing, as we've already ended this response.
}
}
/**
* Render topic info content.
- * @param {Object} ctx
- * @param {String} ctx.responseType
- * @param {String} ctx.topicUrl
- * @param {Number} ctx.count
- * @returns {String}
+ * @param {object} ctx context
+ * @param {string} ctx.responseType response type
+ * @param {string} ctx.topicUrl topic url
+ * @param {number} ctx.count count of subscribers
+ * @returns {string} response content
*/
// eslint-disable-next-line class-methods-use-this
infoContent(ctx) {
- // eslint-disable-next-line sonarjs/no-small-switch
+
switch (ctx.responseType) {
case Enum.ContentType.ApplicationJson:
return JSON.stringify({
/**
* GET request for /info?topic=url&format=type
- * @param {http.ServerResponse} res
- * @param {object} ctx
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
*/
async getInfo(res, ctx) {
const _scope = _fileScope('getInfo');
try {
new URL(ctx.topicUrl);
- } catch (e) {
+ } catch (e) { // eslint-disable-line no-unused-vars
throw new ResponseError(Enum.ErrorResponse.BadRequest, 'invalid topic');
}
ctx.count = count.count;
});
- res.end(this.infoContent(ctx));
- this.logger.info(_scope, 'finished', { ...ctx });
+ const content = this.infoContent(ctx);
+ res.setHeader(Enum.Header.ETag, common.generateETag(undefined, undefined, content));
+ res.setHeader(Enum.Header.CacheControl, 'no-cache');
+ res.end(content);
+ this.logger.info(_scope, 'finished', { ctx });
+ }
+
+
+ /**
+ * label the bars of the topic update history graph
+ * @param {number} index index
+ * @param {number} value value
+ * @returns {string} caption
+ */
+ static _historyBarCaption(index, value) {
+ let when;
+ switch (index) {
+ case 0:
+ when ='today';
+ break;
+ case 1:
+ when = 'yesterday';
+ break;
+ default:
+ when = `${index} days ago`;
+ }
+ return `${when}, ${value || 'no'} update${value === 1 ? '': 's'}`;
+ }
+
+
+ /**
+ * GET SVG chart of topic update history
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
+ */
+ async getHistorySVG(res, ctx) {
+ const _scope = _fileScope('getHistorySVG');
+ this.logger.debug(_scope, 'called', { ctx });
+
+ const days = Math.min(parseInt(ctx.queryParams.days) || this.options.manager.publishHistoryDays, 365);
+ const histOptions = {
+ title: 'Topic Publish History',
+ description: 'Updates per Day',
+ labelZero: '^ Today',
+ labelX: 'Days Ago',
+ maxItems: days,
+ minItems: days,
+ tickEvery: 7,
+ barWidth: 25,
+ barHeight: 40,
+ labelHeight: 12,
+ barCaptionFn: Manager._historyBarCaption,
+ };
+
+ let publishHistory;
+ await this.db.context(async (dbCtx) => {
+ publishHistory = await this.db.topicPublishHistory(dbCtx, ctx.params.topicId, days);
+ });
+
+ res.end(Template.histogramSVG(publishHistory, histOptions));
+ this.logger.info(_scope, 'finished', { ctx });
+ }
+
+
+ /**
+ * Determine if a profile url matches enough of a topic url to describe control over it.
+ * Topic must match hostname and start with the profile's path.
+ * @param {URL} profileUrlObj profile url
+ * @param {URL} topicUrlObj topic url
+ * @returns {boolean} profile is super-url of topic
+ */
+ static _profileControlsTopic(profileUrlObj, topicUrlObj) {
+ const hostnameMatches = profileUrlObj.hostname === topicUrlObj.hostname;
+ const pathIsPrefix = topicUrlObj.pathname.startsWith(profileUrlObj.pathname);
+ return hostnameMatches && pathIsPrefix;
}
/**
* GET request for authorized /admin information.
- * @param {http.ServerResponse} res
- * @param {object} ctx
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
*/
async getAdminOverview(res, ctx) {
const _scope = _fileScope('getAdminOverview');
});
this.logger.debug(_scope, 'got topics', { topics: ctx.topics });
+ // Profile users can only see related topics.
+ if (ctx?.session?.authenticatedProfile) {
+ const profileUrlObj = new URL(ctx.session.authenticatedProfile);
+ ctx.topics = ctx.topics.filter((topic) => {
+ const topicUrlObj = new URL(topic.url);
+ return Manager._profileControlsTopic(profileUrlObj, topicUrlObj);
+ });
+ }
+
res.end(Template.adminOverviewHTML(ctx, this.options));
- this.logger.info(_scope, 'finished', { ...ctx, topics: ctx.topics.length })
+ this.logger.info(_scope, 'finished', { ctx, topics: ctx.topics.length });
}
/**
* GET request for authorized /admin/topic/:topicId information.
- * @param {http.ServerResponse} res
- * @param {object} ctx
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
*/
async getTopicDetails(res, ctx) {
const _scope = _fileScope('getTopicDetails');
this.logger.debug(_scope, 'called', { ctx });
+ ctx.publishSpan = 60; // FIXME: configurable
const topicId = ctx.params.topicId;
+ let publishHistory;
await this.db.context(async (dbCtx) => {
ctx.topic = await this.db.topicGetById(dbCtx, topicId);
ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId);
+ publishHistory = await this.db.topicPublishHistory(dbCtx, topicId, ctx.publishSpan);
});
- this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions });
+ ctx.publishCount = publishHistory.reduce((a, b) => a + b, 0);
+ ctx.subscriptionsDelivered = ctx.subscriptions.filter((subscription) => {
+ return subscription.latestContentDelivered >= ctx.topic.contentUpdated;
+ }).length;
+ this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions, updates: ctx.publishCount });
+
+ // Profile users can only see related topics.
+ if (ctx?.session?.authenticatedProfile) {
+ const profileUrlObj = new URL(ctx.session.authenticatedProfile);
+ const topicUrlObj = new URL(ctx.topic.url);
+ if (!Manager._profileControlsTopic(profileUrlObj, topicUrlObj)) {
+ ctx.topic = null;
+ ctx.subscriptions = [];
+ }
+ }
res.end(Template.adminTopicDetailsHTML(ctx, this.options));
- this.logger.info(_scope, 'finished', { ...ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic.id });
+ this.logger.info(_scope, 'finished', { ctx, subscriptions: ctx.subscriptions.length, topic: ctx?.topic?.id || ctx.topic });
}
/**
* PATCH and DELETE for updating topic data.
- * @param {http.ServerResponse} res
- * @param {Object} ctx
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
*/
async updateTopic(res, ctx) {
const _scope = _fileScope('updateTopic');
await this.db.topicDeleted(txCtx, topicId);
res.end();
this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
+ // Attempt to remove from db if no active subscriptions.
+ await this.db.topicPendingDelete(txCtx, topicId);
return;
}
/**
* PATCH and DELETE for updating subscription data.
- * @param {http.ServerResponse} res
- * @param {Object} ctx
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
*/
async updateSubscription(res, ctx) {
const _scope = _fileScope('updateSubscription');
/**
* POST request for manually running worker.
- * @param {http.ServerResponse} res
- * @param {object} ctx
+ * @param {http.ServerResponse} res response
+ * @param {object} ctx context
*/
async processTasks(res, ctx) {
- const _scope = _fileScope('getTopicDetails');
+ const _scope = _fileScope('processTasks');
this.logger.debug(_scope, 'called', { ctx });
// N.B. no await on this
- this.communication.worker.process();
+ this.communication.worker.process().catch((e) => {
+ this.logger.error(_scope, 'failed', { error: e, ctx });
+ });
res.end();
this.logger.info(_scope, 'invoked worker process', { ctx });