From: Justin Wind Date: Sat, 28 Aug 2021 17:04:08 +0000 (-0700) Subject: Merge branch 'v1.2-dev' as v1.2.0 X-Git-Tag: v1.2.0 X-Git-Url: http://git.squeep.com/?p=websub-hub;a=commitdiff_plain;h=085b55f507dedc16016bb491d520c556acd60643;hp=777488fff1f833e2bb699c700f38aa7374a80735 Merge branch 'v1.2-dev' as v1.2.0 --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 966dede..1b458a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ Releases and notable changes to this project are documented here. ## [Unreleased] +## [v1.2.0] - 2021-08-28 + +### Added + +- Accept multiple topics in publish requests. +- Expired subscription entries are removed from the database when their topics are updated. +- Topics which have been marked deleted are removed from the database after all subscribers have been notified. + ## [v1.1.5] - 2021-08-23 ### Fixed @@ -59,7 +67,8 @@ Releases and notable changes to this project are documented here. --- -[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.1.5 +[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.2.0 +[v1.2.0]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.2.0;hp=v1.1.5 [v1.1.5]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.5;hp=v1.1.4 [v1.1.4]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.4;hp=v1.1.3 [v1.1.3]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.3;hp=v1.1.2 diff --git a/package-lock.json b/package-lock.json index c0aa159..002506d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "websub-hub", - "version": "1.1.5", + "version": "1.2.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -1146,6 +1146,13 @@ "domelementtype": "^2.0.1", "domhandler": "^4.2.0", "entities": "^2.0.0" + }, + "dependencies": { + "entities": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/entities/-/entities-2.2.0.tgz", + "integrity": "sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A==" + } } }, "domelementtype": { @@ -1201,9 +1208,9 @@ } }, "entities": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/entities/-/entities-2.2.0.tgz", - "integrity": "sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A==" + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/entities/-/entities-3.0.1.tgz", + "integrity": "sha512-WiyBqoomrwMdFG1e0kqvASYfnlb0lp8M5o5Fw2OFq1hNZxxcNk8Ik0Xm7LxzBhuidnZB/UtBqVCgUz3kBOP51Q==" }, "es6-error": { "version": "4.1.1", @@ -1764,14 +1771,14 @@ "dev": true }, "htmlparser2": { - "version": "6.1.0", - "resolved": "https://registry.npmjs.org/htmlparser2/-/htmlparser2-6.1.0.tgz", - "integrity": "sha512-gyyPk6rgonLFEDGoeRgQNaEUvdJ4ktTmmUh/h2t7s+M8oPpIPxgNACWa+6ESR57kXstwqPiCut0V8NRpcwgU7A==", + "version": "7.0.0", + "resolved": "https://registry.npmjs.org/htmlparser2/-/htmlparser2-7.0.0.tgz", + "integrity": "sha512-IhdltX9BWhYQft4UPA92jFasNajskja0om6vU0DaIEL4OseCg5zE+mHAMr51AT89TbzzECrQWJ4CZ5NVYTPlKw==", "requires": { "domelementtype": "^2.0.1", "domhandler": "^4.0.0", "domutils": "^2.5.2", - "entities": "^2.0.0" + "entities": "^3.0.1" } }, "https-proxy-agent": { @@ -2229,9 +2236,9 @@ "integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A==" }, "mocha": { - "version": "9.0.3", - "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.0.3.tgz", - "integrity": "sha512-hnYFrSefHxYS2XFGtN01x8un0EwNu2bzKvhpRFhgoybIvMaOkkL60IVPmkb5h6XDmUl4IMSB+rT5cIO4/4bJgg==", + "version": "9.1.0", + "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.1.0.tgz", + "integrity": "sha512-Kjg/XxYOFFUi0h/FwMOeb6RoroiZ+P1yOfya6NK7h3dNhahrJx1r2XIT3ge4ZQvJM86mdjNA+W5phqRQh7DwCg==", "dev": true, "requires": { "@ungap/promise-all-settled": "1.1.2", diff --git a/package.json b/package.json index f6971cd..a16ad0e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "websub-hub", - "version": "1.1.5", + "version": "1.2.0", "description": "A WebSub Hub server implementation.", "main": "server.js", "scripts": { @@ -38,7 +38,7 @@ "axios": "^0.21.1", "better-sqlite3": "^7.4.3", "feedparser": "^2.2.10", - "htmlparser2": "^6.1.0", + "htmlparser2": "^7.0.0", "iconv": "^3.0.0", "pg-promise": "^10.11.0" }, @@ -47,7 +47,7 @@ "eslint-plugin-node": "^11.1.0", "eslint-plugin-security": "^1.4.0", "eslint-plugin-sonarjs": "^0.10.0", - "mocha": "^9.0.3", + "mocha": "^9.1.0", "mocha-steps": "^1.3.0", "nyc": "^15.1.0", "pre-commit": "^1.2.2", diff --git a/src/common.js b/src/common.js index 0cacc3b..55a0807 100644 --- a/src/common.js +++ b/src/common.js @@ -26,6 +26,22 @@ const validHash = (algorithm) => getHashes() .filter((h) => h.match(/^sha[0-9]+$/)) .includes(algorithm); + +/** + * Return an array containing x if x is not an array. + * @param {*} x + */ +const ensureArray = (x) => { + if (x === undefined) { + return []; + } + if (!Array.isArray(x)) { + return Array(x); + } + return x; +}; + + /** * Recursively freeze an object. * @param {Object} o @@ -140,6 +156,7 @@ module.exports = { arrayChunk, attemptRetrySeconds, axiosResponseLogData, + ensureArray, freezeDeep, logTruncate, randomBytesAsync, diff --git a/src/communication.js b/src/communication.js index 345714f..097da63 100644 --- a/src/communication.js +++ b/src/communication.js @@ -223,6 +223,7 @@ class Communication { } if (!topic.isActive) { + // These should be filtered out when selecting verification tasks to process. this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId }); await this.db.verificationRelease(dbCtx, verificationId); return; @@ -328,11 +329,19 @@ class Communication { case Enum.Mode.Unsubscribe: if (verificationAccepted) { await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId); + if (topic.isDeleted) { + // Remove a deleted topic after the last subscription is notified. + await this.db.topicPendingDelete(txCtx, topic.id); + } } break; case Enum.Mode.Denied: await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId); + if (topic.isDeleted) { + // Remove a deleted topic after he last subscription is notified. + await this.db.topicPendingDelete(txCtx, topic.id); + } break; default: @@ -431,6 +440,9 @@ class Communication { throw new Errors.InternalInconsistencyError('no such topic id'); } + // Cull any expired subscriptions + await this.db.subscriptionDeleteExpired(dbCtx, topicId); + logInfoData.url = topicId.url; if (topic.isDeleted) { @@ -479,13 +491,15 @@ class Communication { const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data); if (!validHub) { - this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData }); + this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData }); if (this.options.communication.strictTopicHubLink) { await this.db.transaction(dbCtx, async (txCtx) => { // Set as deleted and set content_updated so subscriptions are notified. await this.db.topicDeleted(txCtx, topicId); await this.db.topicFetchComplete(txCtx, topicId); }); + // Attempt to remove from db, if no active subscriptions. + await this.db.topicPendingDelete(dbCtx, topicId); return; } } diff --git a/src/db/base.js b/src/db/base.js index 95c9010..de0cd44 100644 --- a/src/db/base.js +++ b/src/db/base.js @@ -351,6 +351,16 @@ class Database { } + /** + * Remove any expired subscriptions to a topic. + * @param {*} dbCtx + * @param {*} topicId + */ + async subscriptionDeleteExpired(dbCtx, topicId) { + this._notImplemented('subscriptionDeleteExpired', arguments); + } + + /** * Claim subscriptions needing content updates attempted. * @param {*} dbCtx @@ -533,6 +543,7 @@ class Database { this._notImplemented('topicGetAll', arguments); } + /** * Get topic data, without content. * @param {*} dbCtx @@ -563,14 +574,15 @@ class Database { this._notImplemented('topicGetContentById', arguments); } - // /** - // * Call after an unsubscribe, to check if a topic is awaiting deletion, and that - // * was the last subscription belaying it. - // * @param {String|Integer} data topic url or id - // */ - // async topicPendingDelete(dbCtx, data) { - // this._notImplemented('topicPendingDelete', arguments); - // } + + /** + * Attempt to delete a topic, which must be set isDeleted, if there + * are no more subscriptions belaying its removal. + * @param {*} topicId + */ + async topicPendingDelete(dbCtx, topicId) { + this._notImplemented('topicPendingDelete', arguments); + } /** diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index 2559097..213fa10 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -421,6 +421,21 @@ class DatabasePostgres extends Database { } + async subscriptionDeleteExpired(dbCtx, topicId) { + const _scope = _fileScope('subscriptionDeleteExpired'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId }); + this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount }); + return this._engineInfo(result); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) { const _scope = _fileScope('subscriptionDeliveryClaim'); this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant }); @@ -804,6 +819,37 @@ class DatabasePostgres extends Database { } + async topicPendingDelete(dbCtx, topicId) { + const _scope = _fileScope('topicPendingDelete'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + await dbCtx.txIf(async (txCtx) => { + const topic = await txCtx.one(this.statement.topicGetById, { topicId }); + if (!topic.isDeleted) { + this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId }); + return; + } + + const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url }); + if (subscriberCount) { + this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount }); + return; + } + + const result = await txCtx.result(this.statement.topicDeleteById, { topicId }); + if (result.rowCount !== 1) { + throw new DBErrors.UnexpectedResult('did not delete topic'); + } + }); + this.logger.debug(_scope, 'success', { topicId }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + async topicSet(dbCtx, data) { const _scope = _fileScope('topicSet'); this.logger.debug(_scope, 'called', data); diff --git a/src/db/postgres/sql/subscription-delete-expired.sql b/src/db/postgres/sql/subscription-delete-expired.sql new file mode 100644 index 0000000..d0f96e7 --- /dev/null +++ b/src/db/postgres/sql/subscription-delete-expired.sql @@ -0,0 +1,4 @@ +-- +DELETE FROM subscription +WHERE topic_id = $(topicId) AND expires < now() + diff --git a/src/db/postgres/sql/topic-delete-by-id.sql b/src/db/postgres/sql/topic-delete-by-id.sql new file mode 100644 index 0000000..1378016 --- /dev/null +++ b/src/db/postgres/sql/topic-delete-by-id.sql @@ -0,0 +1,4 @@ +-- +DELETE FROM topic +WHERE id = $(topicId) + diff --git a/src/db/sqlite/index.js b/src/db/sqlite/index.js index fba4e7c..07d6633 100644 --- a/src/db/sqlite/index.js +++ b/src/db/sqlite/index.js @@ -359,6 +359,21 @@ class DatabaseSQLite extends Database { } + subscriptionDeleteExpired(dbCtx, topicId) { + const _scope = _fileScope('subscriptionDeleteExpired'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + const result = this.statement.subscriptionDeleteExpired.run({ topicId }); + this.logger.debug(_scope, 'success', { topicId, deleted: result.changes }); + return this._engineInfo(result); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) { const _scope = _fileScope('subscriptionDeliveryClaim'); this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant }); @@ -768,6 +783,37 @@ class DatabaseSQLite extends Database { } + topicPendingDelete(dbCtx, topicId) { + const _scope = _fileScope('topicPendingDelete'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + this.db.transaction(() => { + const topic = this.statement.topicGetById.get({ topicId }); + if (!topic.isDeleted) { + this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId }); + return; + } + + const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url }); + if (subscriberCount) { + this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount }); + return; + } + + const result = this.statement.topicDeleteById.run({ topicId }); + if (result.changes !== 1) { + throw new DBErrors.UnexpectedResult('did not delete topic'); + } + })(); + this.logger.debug(_scope, 'success', { topicId }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + topicSet(dbCtx, data) { const _scope = _fileScope('topicSet'); this.logger.debug(_scope, 'called', data); diff --git a/src/db/sqlite/sql/subscription-delete-expired.sql b/src/db/sqlite/sql/subscription-delete-expired.sql new file mode 100644 index 0000000..fae9d2c --- /dev/null +++ b/src/db/sqlite/sql/subscription-delete-expired.sql @@ -0,0 +1,3 @@ +-- +DELETE FROM subscription +WHERE topic_id = :topicId AND expires < strftime('%s', 'now') diff --git a/src/db/sqlite/sql/topic-delete-by-id.sql b/src/db/sqlite/sql/topic-delete-by-id.sql new file mode 100644 index 0000000..a66364a --- /dev/null +++ b/src/db/sqlite/sql/topic-delete-by-id.sql @@ -0,0 +1,4 @@ +-- +DELETE FROM topic +WHERE id = :topicId + diff --git a/src/manager.js b/src/manager.js index fddc8e4..cae9e74 100644 --- a/src/manager.js +++ b/src/manager.js @@ -144,7 +144,7 @@ class Manager { if (data.topic) { topic = await this.db.topicGetByUrl(dbCtx, data.topic); - if (!topic && this.options.manager.publicHub) { + if (!topic && this._newTopicCreationAllowed()) { this.logger.info(_scope, 'new topic from subscribe request', { data, requestId }); try { @@ -263,43 +263,165 @@ class Manager { /** - * Check that a publish request topic is valid and exists, - * and if it is, add topicId to data. - * For a public publish request, create topic if not exists. + * Determine if a topic url is allowed to be created. + * In the future, this may be more complicated. + * @returns {Boolean} + */ + _newTopicCreationAllowed() { + return this.options.manager.publicHub; + } + + + /** + * Check that a publish request's topic(s) are valid and exist, + * returning an array with the results for each. + * For a public-hub publish request, creates topics if they do not exist. * @param {*} dbCtx * @param {RootData} data * @param {String[]} warn * @param {String[]} err * @param {String} requestId */ - async _checkPublish(dbCtx, data, warn, err, requestId) { + async _publishTopics(dbCtx, data, requestId) { const _scope = _fileScope('_checkPublish'); - const publishUrl = data.url || data.topic; + // Publish requests may include multiple topics, consider them all, but deduplicate. + const publishUrls = Array.from(new Set([ + ...common.ensureArray(data.url), + ...common.ensureArray(data.topic), + ])); + + // Map the requested topics to their ids, creating if necessary. + return Promise.all(publishUrls.map(async (url) => { + const result = { + url, + warn: [], + err: [], + topicId: undefined, + }; + let topic = await this.db.topicGetByUrl(dbCtx, url); + if (!topic && this._newTopicCreationAllowed()) { + try { + new URL(url); + } catch (e) { + result.err.push('invalid topic url (failed to parse url)'); + return result; + } + await this.db.topicSet(dbCtx, { + // TODO: accept a publisherValidationUrl parameter + url, + }); + topic = await this.db.topicGetByUrl(dbCtx, url); + this.logger.info(_scope, 'new topic from publish request', { url, requestId }); + } + if (!topic || topic.isDeleted) { + result.err.push('topic not supported'); + return result; + } + result.topicId = topic.id; + return result; + })); + } - let topic = await this.db.topicGetByUrl(dbCtx, publishUrl); - if (!topic && this.options.manager.publicHub) { - this.logger.info(_scope, 'new topic from publish request', { data, requestId }); - try { - new URL(publishUrl); - } catch (e) { - err.push('invalid topic url (failed to parse url)'); - return; + /** + * Render response for multi-topic publish requests. + * @param {Object[]} publishTopics + */ + static multiPublishContent(ctx, publishTopics) { + const responses = publishTopics.map((topic) => ({ + href: topic.url, + status: topic.status, + statusMessage: topic.statusMessage, + errors: topic.err, + warnings: topic.warn, + })); + switch (ctx.responseType) { + case Enum.ContentType.ApplicationJson: + return JSON.stringify(responses); + + case Enum.ContentType.TextPlain: + default: { + const textResponses = responses.map((response) => { + const details = Manager._prettyDetails(response.errors, response.warnings); + const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n'); + return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`; + }); + return textResponses.join('\n----\n'); } + } + } + - await this.db.topicSet(dbCtx, { - url: publishUrl, - }); - topic = await this.db.topicGetByUrl(dbCtx, publishUrl); + /** + * Process a publish request. + * @param {*} dbCtx + * @param {Object} data + * @param {http.ServerResponse} res + * @param {Object} ctx + */ + async _publishRequest(dbCtx, data, res, ctx) { + const _scope = _fileScope('_parsePublish'); + this.logger.debug(_scope, 'called', { data }); + + const requestId = ctx.requestId; + + // Parse and validate all the topics in the request. + data.publishTopics = await this._publishTopics(dbCtx, data, requestId); + if (!data.publishTopics || !data.publishTopics.length) { + const details = Manager._prettyDetails(['no valid topic urls to publish'], []); + throw new ResponseError(Enum.ErrorResponse.BadRequest, details); } - if (!topic || topic.isDeleted) { - err.push('not a supported topic'); - return; + // Set status per topic + for (const topicResult of data.publishTopics) { + topicResult.status = topicResult.err.length ? 400 : 202; + topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted'; } - data.topicId = topic.id; + // Process the valid publish notifications + const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length); + try { + await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId))); + } catch (e) { + this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId }); + throw e; + } + + this.logger.info(_scope, 'request accepted', { ctx, data, requestId }); + + if (data.publishTopics.length === 1) { + const soleTopic = data.publishTopics[0]; + res.statusCode = soleTopic.status; + res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n')); + } else { + res.statusCode = 207; + res.end(Manager.multiPublishContent(ctx, data.publishTopics)); + } + + if (this.options.manager.processImmediately + && validPublishTopics.length) { + try { + await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId))); + } catch (e) { + this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId }); + // Don't bother re-throwing, as we've already ended this response. + } + } + } + + + /** + * Annotate any encountered issues. + * @param {String[]} err + * @param {String[]} warn + * @returns {String[]} + */ + static _prettyDetails(err, warn) { + return [ + ...err.map((entry) => `error: ${entry}`), + ...warn.map((entry) => `warning: ${entry}`), + ]; } @@ -322,15 +444,14 @@ class Manager { await this.db.context(async (dbCtx) => { + // Handle publish requests elsewhere if (data.mode === Enum.Mode.Publish) { - await this._checkPublish(dbCtx, data, warn, err, requestId); - } else { - await this._validateRootData(dbCtx, data, warn, err, requestId); + return this._publishRequest(dbCtx, data, res, ctx); } - const prettyErr = err.map((entry) => `error: ${entry}`); - const prettyWarn = warn.map((entry) => `warning: ${entry}`); - const details = prettyErr.concat(prettyWarn); + await this._validateRootData(dbCtx, data, warn, err, requestId); + + const details = Manager._prettyDetails(err, warn); // Any errors are fatal. Stop and report anything that went wrong. if (err.length) { @@ -339,18 +460,11 @@ class Manager { } // Commit the request for later processing. - let fn, info, id; + let id; try { - if (data.mode === Enum.Mode.Publish) { - fn = 'topicFetchRequested'; - info = await this.db.topicFetchRequested(dbCtx, data.topicId); - id = data.topicId; - } else { - fn = 'verificationInsert'; - id = await this.db.verificationInsert(dbCtx, { ...data, requestId }); - } + id = await this.db.verificationInsert(dbCtx, { ...data, requestId }); } catch (e) { - this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId }); + this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId }); throw e; } @@ -362,15 +476,9 @@ class Manager { if (this.options.manager.processImmediately && id) { try { - if (data.mode === Enum.Mode.Publish) { - fn = 'topicFetchClaimAndProcessById'; - await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId); - } else { - fn = 'verificationClaimAndProcessById'; - await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId); - } + await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId); } catch (e) { - this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId }); + this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId }); // Don't bother re-throwing, as we've already ended this response. } } @@ -518,6 +626,8 @@ class Manager { await this.db.topicDeleted(txCtx, topicId); res.end(); this.logger.info(_scope, 'topic set deleted', { ctx, topicId }); + // Attempt to remove from db if no active subscriptions. + await this.db.topicPendingDelete(txCtx, topicId); return; } diff --git a/test/src/common.js b/test/src/common.js index b64ecb4..a9b228c 100644 --- a/test/src/common.js +++ b/test/src/common.js @@ -160,4 +160,21 @@ describe('Common', function () { }); }); // validHash + describe('ensureArray', function () { + it('returns empty array for no data', function () { + const result = common.ensureArray(); + assert.deepStrictEqual(result, []); + }); + it('returns same array passed in', function () { + const expected = [1, 2, 3, 'foo']; + const result = common.ensureArray(expected); + assert.deepStrictEqual(result, expected); + }); + it('returns array containing non-array data', function () { + const data = 'bar'; + const result = common.ensureArray(data); + assert.deepStrictEqual(result, [data]); + }); + }); // ensureArray + }); // Common diff --git a/test/src/communication.js b/test/src/communication.js index ca5f34e..960a0f9 100644 --- a/test/src/communication.js +++ b/test/src/communication.js @@ -405,6 +405,23 @@ describe('Communication', function () { assert(communication.db.verificationComplete.called); }); + it('unsubscription from deleted topic deletes topic', async function () { + communication.db.verificationGetById.restore(); + verification.mode = 'unsubscribe'; + sinon.stub(communication.db, 'verificationGetById').resolves(verification); + communication.db.topicGetById.restore(); + sinon.stub(communication.db, 'topicGetById').resolves({ + ...topic, + isDeleted: true, + }); + + await communication.verificationProcess(dbCtx, callback, topicId, requestId); + + assert(communication.db.subscriptionDelete.called); + assert(communication.db.verificationComplete.called); + assert(communication.db.topicPendingDelete.called); + }); + it('unsubscription denial succeeds', async function () { communication.db.verificationGetById.restore(); verification.mode = 'unsubscribe'; diff --git a/test/src/db/integration.js b/test/src/db/integration.js index e6f632c..8d5b615 100644 --- a/test/src/db/integration.js +++ b/test/src/db/integration.js @@ -114,6 +114,7 @@ describe('Database Integration', function () { }); // Authentication describe('Topic', function () { + let anotherTopicId; step('requires data', async function () { try { await db.context(async (dbCtx) => { @@ -222,7 +223,7 @@ describe('Database Integration', function () { step('deletes a topic', async function () { await db.context(async (dbCtx) => { const result = await db.topicSet(dbCtx, testData.anotherTopicSet); - const anotherTopicId = result.lastInsertRowid; + anotherTopicId = result.lastInsertRowid; await db.topicDeleted(dbCtx, anotherTopicId); const topic = await db.topicGetById(dbCtx, anotherTopicId); assert.strictEqual(topic.isDeleted, true); @@ -231,7 +232,7 @@ describe('Database Integration', function () { step('update un-deletes a topic', async function () { await db.context(async (dbCtx) => { const result = await db.topicSet(dbCtx, testData.anotherTopicSet); - const anotherTopicId = result.lastInsertRowid; + assert.strictEqual(result.lastInsertRowid, anotherTopicId); const topic = await db.topicGetById(dbCtx, anotherTopicId); assert.strictEqual(topic.isDeleted, false); }); @@ -242,6 +243,15 @@ describe('Database Integration', function () { assert(topics.length); }); }); + // pending delete of deleted topic with no subscriptions + step('really deletes unsubscribed deleted topic', async function() { + await db.context(async (dbCtx) => { + await db.topicDeleted(dbCtx, anotherTopicId); + await db.topicPendingDelete(dbCtx, anotherTopicId); + const topic = await db.topicGetById(dbCtx, anotherTopicId); + assert(!topic); + }); + }); }); // Topic describe('Subscription', function () { @@ -372,6 +382,28 @@ describe('Database Integration', function () { assert(!subscription); }); }); + step('create expired subscription', async function () { + const data = { + ...testData.subscriptionUpsert, + secret: 'newSecret', + topicId, + leaseSeconds: -1, + }; + await db.context(async (dbCtx) => { + const result = await db.subscriptionUpsert(dbCtx, data); + assert(result.lastInsertRowid); + assert.notStrictEqual(result.lastInsertRowid, subscriptionId); + subscriptionId = result.lastInsertRowid; + assert.strictEqual(result.changes, 1); + }); + }); + step('delete expired subscriptions', async function() { + await db.context(async (dbCtx) => { + await db.subscriptionDeleteExpired(dbCtx, topicId) + const subscription = await db.subscriptionGet(dbCtx, testData.subscriptionUpsert.callback, topicId); + assert(!subscription); + }); + }); }); // Subscription describe('Verification', function () { diff --git a/test/src/db/postgres.js b/test/src/db/postgres.js index ef47905..0f037d6 100644 --- a/test/src/db/postgres.js +++ b/test/src/db/postgres.js @@ -557,7 +557,7 @@ describe('DatabasePostgres', function () { changes: 1, lastInsertRowid: undefined, duration: 10, - } + }; sinon.stub(db.db, 'result').resolves(dbResult); const result = await db.subscriptionDelete(dbCtx, callback, topicId); assert.deepStrictEqual(result, expected); @@ -574,6 +574,34 @@ describe('DatabasePostgres', function () { }); }); // subscriptionDelete + describe('subscriptionDeleteExpired', function () { + it('success', async function () { + const dbResult = { + rowCount: 1, + rows: [], + duration: 10, + }; + const expected = { + changes: 1, + lastInsertRowid: undefined, + duration: 10, + }; + sinon.stub(db.db, 'result').resolves(dbResult); + const result = await db.subscriptionDeleteExpired(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + }); + it('failure', async function() { + const expected = new Error(); + sinon.stub(db.db, 'result').rejects(expected); + try { + await db.subscriptionDeleteExpired(dbCtx, topicId); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + }); + describe('subscriptionDeliveryClaim', function () { it('success', async function() { const dbResult = [ @@ -1214,6 +1242,69 @@ describe('DatabasePostgres', function () { }); }); // topicGetContentById + describe('topicPendingDelete', function () { + beforeEach(function () { + sinon.stub(db.db, 'one'); + sinon.stub(db.db, 'result'); + }); + it('success', async function () { + db.db.one.onCall(0).resolves({ + id: topicId, + isDeleted: true, + }).onCall(1).resolves({ + count: 0, + }); + const dbResult = { + rowCount: 1, + rows: [], + duration: 10, + }; + db.db.result.resolves(dbResult); + await db.topicPendingDelete(dbCtx, topicId); + assert(db.db.result.called); + }); + it('does not delete non-deleted topic', async function () { + db.db.one.onCall(0).resolves({ + id: topicId, + isDeleted: false, + }).onCall(1).resolves({ + count: 0, + }); + await db.topicPendingDelete(dbCtx, topicId); + assert(!db.db.result.called); + }); + it('does not delete topic with active subscriptions', async function () { + db.db.one.onCall(0).resolves({ + id: topicId, + isDeleted: true, + }).onCall(1).resolves({ + count: 10, + }); + await db.topicPendingDelete(dbCtx, topicId); + assert(!db.db.result.called); + }); + it('covers no deletion', async function () { + db.db.one.onCall(0).resolves({ + id: topicId, + isDeleted: true, + }).onCall(1).resolves({ + count: 0, + }); + const dbResult = { + rowCount: 0, + rows: [], + duration: 10, + }; + db.db.result.resolves(dbResult); + try { + await db.topicPendingDelete(dbCtx, topicId); + assert.fail(noExpectedException); + } catch (e) { + assert(e instanceof DBErrors.UnexpectedResult); + } + }); + }); + describe('topicSet', function () { let data; beforeEach(function () { diff --git a/test/src/db/sqlite.js b/test/src/db/sqlite.js index 0c96df3..370d559 100644 --- a/test/src/db/sqlite.js +++ b/test/src/db/sqlite.js @@ -404,8 +404,34 @@ describe('DatabaseSQLite', function () { }); }); // subscriptionDelete + describe('subscriptionDeleteExpired', function () { + it('success', async function () { + const dbResult = { + changes: 1, + lastInsertRowid: undefined, + }; + const expected = { + changes: 1, + lastInsertRowid: undefined, + }; + sinon.stub(db.statement.subscriptionDeleteExpired, 'run').returns(dbResult); + const result = await db.subscriptionDeleteExpired(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + }); + it('failure', async function () { + const expected = new Error(); + sinon.stub(db.statement.subscriptionDeleteExpired, 'run').throws(expected); + try { + await db.subscriptionDeleteExpired(dbCtx, topicId); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + }); + describe('subscriptionDeliveryClaim', function () { - it('success', async function() { + it('success', async function () { const dbAllResult = [ { id: 'c2e254c5-aa6e-4a8f-b1a1-e474b07392bb', @@ -1021,6 +1047,76 @@ describe('DatabaseSQLite', function () { }); }); // topicGetContentById + describe('topicPendingDelete', function () { + beforeEach(function () { + sinon.stub(db.statement.topicGetById, 'get'); + sinon.stub(db.statement.subscriptionCountByTopicUrl, 'get'); + sinon.stub(db.statement.topicDeleteById, 'run'); + }); + it('success', async function () { + db.statement.topicGetById.get.returns({ + id: topicId, + isDeleted: true, + }); + db.statement.subscriptionCountByTopicUrl.get.returns({ + count: 0, + }); + db.statement.topicDeleteById.run.returns({ + changes: 1, + }); + db.topicPendingDelete(dbCtx, topicId); + assert(db.statement.topicDeleteById.run.called); + }); + it('does not delete non-deleted topic', async function () { + db.statement.topicGetById.get.returns({ + id: topicId, + isDeleted: false, + }); + db.statement.subscriptionCountByTopicUrl.get.returns({ + count: 0, + }); + db.statement.topicDeleteById.run.returns({ + changes: 1, + }); + db.topicPendingDelete(dbCtx, topicId); + assert(!db.statement.topicDeleteById.run.called); + }); + it('does not delete topic with active subscriptions', async function () { + db.statement.topicGetById.get.returns({ + id: topicId, + isDeleted: true, + }); + db.statement.subscriptionCountByTopicUrl.get.returns({ + count: 10, + }); + db.statement.topicDeleteById.run.returns({ + changes: 1, + }); + db.topicPendingDelete(dbCtx, topicId); + assert(!db.statement.topicDeleteById.run.called); + }); + it('covers no deletion', async function () { + db.statement.topicGetById.get.returns({ + id: topicId, + isDeleted: true, + }); + db.statement.subscriptionCountByTopicUrl.get.returns({ + count: 0, + }); + db.statement.topicDeleteById.run.returns({ + changes: 0, + }); + try { + db.topicPendingDelete(dbCtx, topicId); + assert.fail(noExpectedException); + + } catch (e) { + assert(e instanceof DBErrors.UnexpectedResult); + } + assert(db.statement.topicDeleteById.run.called); + }); + }); + describe('topicSet', function () { let data; beforeEach(function () { diff --git a/test/src/manager.js b/test/src/manager.js index 7870a55..89e3d65 100644 --- a/test/src/manager.js +++ b/test/src/manager.js @@ -37,6 +37,7 @@ describe('Manager', function () { manager = new Manager(stubLogger, stubDb, options); sinon.stub(manager.communication, 'verificationProcess'); sinon.stub(manager.communication, 'topicFetchProcess'); + sinon.stub(manager.communication, 'topicFetchClaimAndProcessById'); stubDb._reset(); stubLogger._reset(); }); @@ -174,7 +175,7 @@ describe('Manager', function () { await manager.getAdminOverview(res, ctx); assert(res.end.called); }); - }); + }); // getAdminOverview describe('getTopicDetails', function () { it('covers', async function() { @@ -559,13 +560,11 @@ describe('Manager', function () { }); }); // _checkMode - describe('_checkPublish', function () { - let dbCtx, data, warn, err, requestId; + describe('_publishTopics', function () { + let dbCtx, data, requestId; beforeEach(function () { dbCtx = {}; data = {}; - warn = []; - err = []; requestId = 'blah'; }); it('succeeds', async function () { @@ -573,26 +572,29 @@ describe('Manager', function () { id: 222, }); Object.assign(data, testData.validPublishRootData); - await manager._checkPublish(dbCtx, data, warn, err, requestId); - assert.strictEqual(warn.length, 0, 'unexpected warnings length'); - assert.strictEqual(err.length, 0, 'unexpected errors length'); - assert.strictEqual(data.topicId, 222, 'unexpected topic id'); + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 1); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id'); }); it('fails bad url', async function () { Object.assign(data, testData.validPublishRootData, { topic: 'not_a_url' }); - await manager._checkPublish(dbCtx, data, warn, err, requestId); - assert.strictEqual(err.length, 1, 'unexpected errors length'); - assert.strictEqual(warn.length, 0); + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 1); + assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length'); + assert.strictEqual(topicResults[0].warn.length, 0); }); it('accepts new public publish topic', async function () { manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({ id: 222, }); Object.assign(data, testData.validPublishRootData); - await manager._checkPublish(dbCtx, data, warn, err, requestId); - assert.strictEqual(warn.length, 0, 'unexpected warnings length'); - assert.strictEqual(err.length, 0, 'unexpected errors length'); - assert.strictEqual(data.topicId, 222, 'unexpected topic id'); + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 1); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id'); }); it('does not publish deleted topic', async function () { manager.db.topicGetByUrl.resolves({ @@ -600,12 +602,176 @@ describe('Manager', function () { isDeleted: true, }); Object.assign(data, testData.validPublishRootData); - await manager._checkPublish(dbCtx, data, warn, err, requestId); - assert.strictEqual(warn.length, 0, 'unexpected warnings length'); - assert.strictEqual(err.length, 1, 'unexpected errors length'); - assert.strictEqual(data.topicId, undefined, 'unexpected topic id'); + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 1); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, undefined, 'unexpected topic id'); + }); + it('no topics', async function() { + Object.assign(data, testData.validPublishRootData); + delete data.topic; + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 0); + }); + it('multiple valid topics', async function () { + manager.db.topicGetByUrl.resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + data.url = ['https://example.com/first', 'https://example.com/second']; + data.topic = ['https://example.com/third']; + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 3); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id'); + assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[1].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[1].topicId, 222, 'unexpected topic id'); + assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id'); + }); + it('mix of valid and invalid topics', async function () { + manager.db.topicGetByUrl.onCall(1).resolves().resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + data.url = ['https://example.com/first', 'not a url']; + data.topic = ['https://example.com/third']; + const topicResults = await manager._publishTopics(dbCtx, data, requestId); + assert.strictEqual(topicResults.length, 3); + assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id'); + assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[1].err.length, 1, 'unexpected errors length'); + assert.strictEqual(topicResults[1].topicId, undefined, 'unexpected topic id'); + assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length'); + assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length'); + assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id'); + }); + }); // _publishTopics + + describe('_publishRequest', function () { + let dbCtx, data, res, ctx; + beforeEach(function () { + dbCtx = {}; + data = {}; + res = { + end: sinon.stub(), + }; + ctx = {}; + }); + it('requires a topic', async function () { + try { + await manager._publishRequest(dbCtx, data, res, ctx); + assert.fail(noExpectedException); + } catch (e) { + assert(e instanceof Errors.ResponseError); + } + }); + it('processes one topic', async function() { + manager.db.topicGetByUrl.resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + manager.db.topicFetchRequested.resolves(); + await manager._publishRequest(dbCtx, data, res, ctx); + assert(manager.db.topicFetchRequested.called); + assert.strictEqual(res.statusCode, 202); + assert(res.end.called); + }); + it('processes mix of valid and invalid topics', async function () { + ctx.responseType = 'application/json'; + manager.db.topicGetByUrl.onCall(1).resolves().resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + data.url = ['https://example.com/first', 'not a url']; + data.topic = ['https://example.com/third']; + await manager._publishRequest(dbCtx, data, res, ctx); + assert.strictEqual(res.statusCode, 207); + assert(res.end.called); + }); + it('covers topicFetchRequest failure', async function () { + manager.db.topicGetByUrl.resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + const expected = new Error('boo'); + manager.db.topicFetchRequested.rejects(expected); + try { + await manager._publishRequest(dbCtx, data, res, ctx); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } }); - }); // _checkPublish + it('covers immediate processing error', async function() { + manager.options.manager.processImmediately = true; + manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({ + id: 222, + }); + manager.communication.topicFetchClaimAndProcessById.rejects(); + Object.assign(data, testData.validPublishRootData); + await manager._publishRequest(dbCtx, data, res, ctx); + assert(manager.db.topicFetchRequested.called); + assert.strictEqual(res.statusCode, 202); + assert(res.end.called); + assert(manager.communication.topicFetchClaimAndProcessById.called) + }); + it('covers no immediate processing', async function() { + manager.options.manager.processImmediately = false; + manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({ + id: 222, + }); + Object.assign(data, testData.validPublishRootData); + await manager._publishRequest(dbCtx, data, res, ctx); + assert(manager.db.topicFetchRequested.called); + assert.strictEqual(res.statusCode, 202); + assert(res.end.called); + assert(!manager.communication.topicFetchClaimAndProcessById.called) + }); + }); // _publishRequest + + describe('multiPublishContent', function () { + let publishTopics; + beforeEach(function () { + publishTopics = [{ + url: 'https://example.com/first', + warn: [], + err: [], + topicId: 222, + status: 202, + statusMessage: 'Accepted', + }, + { + url: 'not a url', + warn: [], + err: [ 'invalid topic url (failed to parse url)' ], + topicId: undefined, + status: 400, + statusMessage: 'Bad Request', + }]; + }); + it('covers json response', function () { + ctx.responseType = 'application/json'; + const expected = '[{"href":"https://example.com/first","status":202,"statusMessage":"Accepted","errors":[],"warnings":[]},{"href":"not a url","status":400,"statusMessage":"Bad Request","errors":["invalid topic url (failed to parse url)"],"warnings":[]}]'; + const result = Manager.multiPublishContent(ctx, publishTopics); + assert.deepStrictEqual(result, expected); + }); + it('covers text response', function () { + ctx.responseType = 'text/plain'; + const expected = `https://example.com/first [202 Accepted] +---- +not a url [400 Bad Request] +\terror: invalid topic url (failed to parse url)`; + const result = Manager.multiPublishContent(ctx, publishTopics); + assert.deepStrictEqual(result, expected); + }); + }); // multiPublishContent describe('processTasks', function () { it('covers', async function () { diff --git a/test/stub-db.js b/test/stub-db.js index 5ef2422..f257cbf 100644 --- a/test/stub-db.js +++ b/test/stub-db.js @@ -17,6 +17,7 @@ const stubFns = [ 'subscriptionsByTopicId', 'subscriptionCountByTopicUrl', 'subscriptionDelete', + 'subscriptionDeleteExpired', 'subscriptionDeliveryClaim', 'subscriptionDeliveryClaimById', 'subscriptionDeliveryComplete', @@ -36,6 +37,7 @@ const stubFns = [ 'topicGetById', 'topicGetByUrl', 'topicGetContentById', + 'topicPendingDelete', 'topicSet', 'topicSetContent', 'topicUpdate',