X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=src%2Fmanager.js;h=74730c27be142a5ac33e467fccec182d61bcefde;hb=HEAD;hp=b3aff45da975952ea8ad27d51aa058823348cbdd;hpb=9696c012e6b9a6c58904baa397ca0ebf78112316;p=websub-hub diff --git a/src/manager.js b/src/manager.js index b3aff45..74730c2 100644 --- a/src/manager.js +++ b/src/manager.js @@ -25,13 +25,6 @@ class Manager { this.db = db; this.options = options; this.communication = new Communication(logger, db, options); - - // Precalculate the invariant root GET metadata. - this.getRootContent = Template.rootHTML(undefined, options); - const now = new Date(); - this.startTimeString = now.toGMTString(); - this.startTimeMs = now.getTime(); - this.getRootETag = common.generateETag(undefined, undefined, this.getRootContent); } @@ -40,7 +33,7 @@ class Manager { * @param {http.ServerResponse} res * @param {object} ctx */ - async getHealthcheck(res, ctx) { + async getHealthcheck(res, ctx) { const _scope = _fileScope('getHealthcheck'); const health = 'happy'; @@ -53,6 +46,7 @@ class Manager { /** * GET request for root. + * @param {http.ClientRequest} req * @param {http.ServerResponse} res * @param {object} ctx */ @@ -60,16 +54,8 @@ class Manager { const _scope = _fileScope('getRoot'); this.logger.debug(_scope, 'called', { ctx }); - res.setHeader(Enum.Header.LastModified, this.startTimeString); - res.setHeader(Enum.Header.ETag, this.getRootETag); - - if (common.isClientCached(req, this.startTimeMs, this.getRootETag)) { - this.logger.debug(_scope, 'client cached response', { ctx }); - res.statusCode = 304; - res.end(); - return; - } - res.end(this.getRootContent); + const content = Template.rootHTML(ctx, this.options); + res.end(content); this.logger.info(_scope, 'finished', { ctx }); } @@ -144,7 +130,7 @@ class Manager { if (data.topic) { topic = await this.db.topicGetByUrl(dbCtx, data.topic); - if (!topic && this.options.manager.publicHub) { + if (!topic && this._newTopicCreationAllowed()) { this.logger.info(_scope, 'new topic from subscribe request', { data, requestId }); try { @@ -263,43 +249,165 @@ class Manager { /** - * Check that a publish request topic is valid and exists, - * and if it is, add topicId to data. - * For a public publish request, create topic if not exists. + * Determine if a topic url is allowed to be created. + * In the future, this may be more complicated. + * @returns {Boolean} + */ + _newTopicCreationAllowed() { + return this.options.manager.publicHub; + } + + + /** + * Check that a publish request's topic(s) are valid and exist, + * returning an array with the results for each. + * For a public-hub publish request, creates topics if they do not exist. * @param {*} dbCtx * @param {RootData} data * @param {String[]} warn * @param {String[]} err * @param {String} requestId */ - async _checkPublish(dbCtx, data, warn, err, requestId) { + async _publishTopics(dbCtx, data, requestId) { const _scope = _fileScope('_checkPublish'); - const publishUrl = data.url || data.topic; + // Publish requests may include multiple topics, consider them all, but deduplicate. + const publishUrls = Array.from(new Set([ + ...common.ensureArray(data.url), + ...common.ensureArray(data.topic), + ])); + + // Map the requested topics to their ids, creating if necessary. + return Promise.all(publishUrls.map(async (url) => { + const result = { + url, + warn: [], + err: [], + topicId: undefined, + }; + let topic = await this.db.topicGetByUrl(dbCtx, url); + if (!topic && this._newTopicCreationAllowed()) { + try { + new URL(url); + } catch (e) { + result.err.push('invalid topic url (failed to parse url)'); + return result; + } + await this.db.topicSet(dbCtx, { + // TODO: accept a publisherValidationUrl parameter + url, + }); + topic = await this.db.topicGetByUrl(dbCtx, url); + this.logger.info(_scope, 'new topic from publish request', { url, requestId }); + } + if (!topic || topic.isDeleted) { + result.err.push('topic not supported'); + return result; + } + result.topicId = topic.id; + return result; + })); + } - let topic = await this.db.topicGetByUrl(dbCtx, publishUrl); - if (!topic && this.options.manager.publicHub) { - this.logger.info(_scope, 'new topic from publish request', { data, requestId }); - try { - new URL(publishUrl); - } catch (e) { - err.push('invalid topic url (failed to parse url)'); - return; + /** + * Render response for multi-topic publish requests. + * @param {Object[]} publishTopics + */ + static multiPublishContent(ctx, publishTopics) { + const responses = publishTopics.map((topic) => ({ + href: topic.url, + status: topic.status, + statusMessage: topic.statusMessage, + errors: topic.err, + warnings: topic.warn, + })); + switch (ctx.responseType) { + case Enum.ContentType.ApplicationJson: + return JSON.stringify(responses); + + case Enum.ContentType.TextPlain: + default: { + const textResponses = responses.map((response) => { + const details = Manager._prettyDetails(response.errors, response.warnings); + const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n'); + return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`; + }); + return textResponses.join('\n----\n'); } + } + } - await this.db.topicSet(dbCtx, { - url: publishUrl, - }); - topic = await this.db.topicGetByUrl(dbCtx, publishUrl); + + /** + * Process a publish request. + * @param {*} dbCtx + * @param {Object} data + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async _publishRequest(dbCtx, data, res, ctx) { + const _scope = _fileScope('_parsePublish'); + this.logger.debug(_scope, 'called', { data }); + + const requestId = ctx.requestId; + + // Parse and validate all the topics in the request. + data.publishTopics = await this._publishTopics(dbCtx, data, requestId); + if (!data.publishTopics || !data.publishTopics.length) { + const details = Manager._prettyDetails(['no valid topic urls to publish'], []); + throw new ResponseError(Enum.ErrorResponse.BadRequest, details); } - if (!topic || topic.isDeleted) { - err.push('not a supported topic'); - return; + // Set status per topic + for (const topicResult of data.publishTopics) { + topicResult.status = topicResult.err.length ? 400 : 202; + topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted'; } - data.topicId = topic.id; + // Process the valid publish notifications + const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length); + try { + await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId))); + } catch (e) { + this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId }); + throw e; + } + + this.logger.info(_scope, 'request accepted', { ctx, data, requestId }); + + if (data.publishTopics.length === 1) { + const soleTopic = data.publishTopics[0]; + res.statusCode = soleTopic.status; + res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n')); + } else { + res.statusCode = 207; + res.end(Manager.multiPublishContent(ctx, data.publishTopics)); + } + + if (this.options.manager.processImmediately + && validPublishTopics.length) { + try { + await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId))); + } catch (e) { + this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId }); + // Don't bother re-throwing, as we've already ended this response. + } + } + } + + + /** + * Annotate any encountered issues. + * @param {String[]} err + * @param {String[]} warn + * @returns {String[]} + */ + static _prettyDetails(err, warn) { + return [ + ...err.map((entry) => `error: ${entry}`), + ...warn.map((entry) => `warning: ${entry}`), + ]; } @@ -322,15 +430,14 @@ class Manager { await this.db.context(async (dbCtx) => { + // Handle publish requests elsewhere if (data.mode === Enum.Mode.Publish) { - await this._checkPublish(dbCtx, data, warn, err, requestId); - } else { - await this._validateRootData(dbCtx, data, warn, err, requestId); + return this._publishRequest(dbCtx, data, res, ctx); } - const prettyErr = err.map((entry) => `error: ${entry}`); - const prettyWarn = warn.map((entry) => `warning: ${entry}`); - const details = prettyErr.concat(prettyWarn); + await this._validateRootData(dbCtx, data, warn, err, requestId); + + const details = Manager._prettyDetails(err, warn); // Any errors are fatal. Stop and report anything that went wrong. if (err.length) { @@ -339,18 +446,11 @@ class Manager { } // Commit the request for later processing. - let fn, info, id; + let id; try { - if (data.mode === Enum.Mode.Publish) { - fn = 'topicPublish'; - info = await this.db.topicFetchRequested(dbCtx, data.topicId); - id = data.topicId; - } else { - fn = 'verificationInsert'; - id = await this.db.verificationInsert(dbCtx, { ...data, requestId }); - } + id = await this.db.verificationInsert(dbCtx, { ...data, requestId }); } catch (e) { - this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId }); + this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId }); throw e; } @@ -362,15 +462,9 @@ class Manager { if (this.options.manager.processImmediately && id) { try { - if (data.mode === Enum.Mode.Publish) { - fn = 'topicFetchClaimAndProcessById'; - await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId); - } else { - fn = 'processVerification'; - await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId); - } + await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId); } catch (e) { - this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId }); + this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId }); // Don't bother re-throwing, as we've already ended this response. } } @@ -449,8 +543,81 @@ class Manager { ctx.count = count.count; }); - res.end(this.infoContent(ctx)); - this.logger.info(_scope, 'finished', { ...ctx }); + const content = this.infoContent(ctx); + res.setHeader(Enum.Header.ETag, common.generateETag(undefined, undefined, content)); + res.setHeader(Enum.Header.CacheControl, 'no-cache'); + res.end(content); + this.logger.info(_scope, 'finished', { ctx }); + } + + + /** + * label the bars of the topic update history graph + * @param {Number} index + * @param {Number} value + * @returns {String} + */ + static _historyBarCaption(index, value) { + let when; + switch (index) { + case 0: + when ='today'; + break; + case 1: + when = 'yesterday'; + break; + default: + when = `${index} days ago`; + } + return `${when}, ${value ? value : 'no'} update${value === 1 ? '': 's'}`; + } + + + /** + * GET SVG chart of topic update history + * @param {http.ServerResponse} res + * @param {object} ctx + */ + async getHistorySVG(res, ctx) { + const _scope = _fileScope('getHistorySVG'); + this.logger.debug(_scope, 'called', { ctx }); + + const days = Math.min(parseInt(ctx.queryParams.days) || this.options.manager.publishHistoryDays, 365); + const histOptions = { + title: 'Topic Publish History', + description: 'Updates per Day', + labelZero: '^ Today', + labelX: 'Days Ago', + maxItems: days, + minItems: days, + tickEvery: 7, + barWidth: 25, + barHeight: 40, + labelHeight: 12, + barCaptionFn: Manager._historyBarCaption, + }; + + let publishHistory; + await this.db.context(async (dbCtx) => { + publishHistory = await this.db.topicPublishHistory(dbCtx, ctx.params.topicId, days); + }); + + res.end(Template.histogramSVG(publishHistory, histOptions)); + this.logger.info(_scope, 'finished', { ctx }); + } + + + /** + * Determine if a profile url matches enough of a topic url to describe control over it. + * Topic must match hostname and start with the profile's path. + * @param {URL} profileUrlObj + * @param {URL} topicUrlObj + * @returns {Boolean} + */ + static _profileControlsTopic(profileUrlObj, topicUrlObj) { + const hostnameMatches = profileUrlObj.hostname === topicUrlObj.hostname; + const pathIsPrefix = topicUrlObj.pathname.startsWith(profileUrlObj.pathname); + return hostnameMatches && pathIsPrefix; } @@ -468,8 +635,17 @@ class Manager { }); this.logger.debug(_scope, 'got topics', { topics: ctx.topics }); + // Profile users can only see related topics. + if (ctx.session && ctx.session.authenticatedProfile) { + const profileUrlObj = new URL(ctx.session.authenticatedProfile); + ctx.topics = ctx.topics.filter((topic) => { + const topicUrlObj = new URL(topic.url); + return Manager._profileControlsTopic(profileUrlObj, topicUrlObj); + }); + } + res.end(Template.adminOverviewHTML(ctx, this.options)); - this.logger.info(_scope, 'finished', { ...ctx, topics: ctx.topics.length }) + this.logger.info(_scope, 'finished', { ctx, topics: ctx.topics.length }); } @@ -482,15 +658,32 @@ class Manager { const _scope = _fileScope('getTopicDetails'); this.logger.debug(_scope, 'called', { ctx }); + ctx.publishSpan = 60; // FIXME: configurable const topicId = ctx.params.topicId; + let publishHistory; await this.db.context(async (dbCtx) => { ctx.topic = await this.db.topicGetById(dbCtx, topicId); ctx.subscriptions = await this.db.subscriptionsByTopicId(dbCtx, topicId); + publishHistory = await this.db.topicPublishHistory(dbCtx, topicId, ctx.publishSpan); }); - this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions }); + ctx.publishCount = publishHistory.reduce((a, b) => a + b, 0); + ctx.subscriptionsDelivered = ctx.subscriptions.filter((subscription) => { + return subscription.latestContentDelivered >= ctx.topic.contentUpdated; + }).length; + this.logger.debug(_scope, 'got topic details', { topic: ctx.topic, subscriptions: ctx.subscriptions, updates: ctx.publishCount }); + + // Profile users can only see related topics. + if (ctx.session && ctx.session.authenticatedProfile) { + const profileUrlObj = new URL(ctx.session.authenticatedProfile); + const topicUrlObj = new URL(ctx.topic.url); + if (!Manager._profileControlsTopic(profileUrlObj, topicUrlObj)) { + ctx.topic = null; + ctx.subscriptions = []; + } + } res.end(Template.adminTopicDetailsHTML(ctx, this.options)); - this.logger.info(_scope, 'finished', { ...ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic.id }); + this.logger.info(_scope, 'finished', { ctx, subscriptions: ctx.subscriptions.length, topic: ctx.topic && ctx.topic.id || ctx.topic }); } @@ -518,6 +711,8 @@ class Manager { await this.db.topicDeleted(txCtx, topicId); res.end(); this.logger.info(_scope, 'topic set deleted', { ctx, topicId }); + // Attempt to remove from db if no active subscriptions. + await this.db.topicPendingDelete(txCtx, topicId); return; } @@ -662,11 +857,13 @@ class Manager { * @param {object} ctx */ async processTasks(res, ctx) { - const _scope = _fileScope('getTopicDetails'); + const _scope = _fileScope('processTasks'); this.logger.debug(_scope, 'called', { ctx }); // N.B. no await on this - this.communication.worker.process(); + this.communication.worker.process().catch((e) => { + this.logger.error(_scope, 'failed', { error: e, ctx }); + }); res.end(); this.logger.info(_scope, 'invoked worker process', { ctx });