## [Unreleased]
+## [v1.2.0] - 2021-08-28
+
+### Added
+
+- Accept multiple topics in publish requests.
+- Expired subscription entries are removed from the database when their topics are updated.
+- Topics which have been marked deleted are removed from the database after all subscribers have been notified.
+
## [v1.1.5] - 2021-08-23
### Fixed
---
-[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.1.5
+[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.2.0
+[v1.2.0]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.2.0;hp=v1.1.5
[v1.1.5]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.5;hp=v1.1.4
[v1.1.4]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.4;hp=v1.1.3
[v1.1.3]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.3;hp=v1.1.2
{
"name": "websub-hub",
- "version": "1.1.5",
+ "version": "1.2.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"domelementtype": "^2.0.1",
"domhandler": "^4.2.0",
"entities": "^2.0.0"
+ },
+ "dependencies": {
+ "entities": {
+ "version": "2.2.0",
+ "resolved": "https://registry.npmjs.org/entities/-/entities-2.2.0.tgz",
+ "integrity": "sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A=="
+ }
}
},
"domelementtype": {
}
},
"entities": {
- "version": "2.2.0",
- "resolved": "https://registry.npmjs.org/entities/-/entities-2.2.0.tgz",
- "integrity": "sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A=="
+ "version": "3.0.1",
+ "resolved": "https://registry.npmjs.org/entities/-/entities-3.0.1.tgz",
+ "integrity": "sha512-WiyBqoomrwMdFG1e0kqvASYfnlb0lp8M5o5Fw2OFq1hNZxxcNk8Ik0Xm7LxzBhuidnZB/UtBqVCgUz3kBOP51Q=="
},
"es6-error": {
"version": "4.1.1",
"dev": true
},
"htmlparser2": {
- "version": "6.1.0",
- "resolved": "https://registry.npmjs.org/htmlparser2/-/htmlparser2-6.1.0.tgz",
- "integrity": "sha512-gyyPk6rgonLFEDGoeRgQNaEUvdJ4ktTmmUh/h2t7s+M8oPpIPxgNACWa+6ESR57kXstwqPiCut0V8NRpcwgU7A==",
+ "version": "7.0.0",
+ "resolved": "https://registry.npmjs.org/htmlparser2/-/htmlparser2-7.0.0.tgz",
+ "integrity": "sha512-IhdltX9BWhYQft4UPA92jFasNajskja0om6vU0DaIEL4OseCg5zE+mHAMr51AT89TbzzECrQWJ4CZ5NVYTPlKw==",
"requires": {
"domelementtype": "^2.0.1",
"domhandler": "^4.0.0",
"domutils": "^2.5.2",
- "entities": "^2.0.0"
+ "entities": "^3.0.1"
}
},
"https-proxy-agent": {
"integrity": "sha512-gKLcREMhtuZRwRAfqP3RFW+TK4JqApVBtOIftVgjuABpAtpxhPGaDcfvbhNvD0B8iD1oUr/txX35NjcaY6Ns/A=="
},
"mocha": {
- "version": "9.0.3",
- "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.0.3.tgz",
- "integrity": "sha512-hnYFrSefHxYS2XFGtN01x8un0EwNu2bzKvhpRFhgoybIvMaOkkL60IVPmkb5h6XDmUl4IMSB+rT5cIO4/4bJgg==",
+ "version": "9.1.0",
+ "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.1.0.tgz",
+ "integrity": "sha512-Kjg/XxYOFFUi0h/FwMOeb6RoroiZ+P1yOfya6NK7h3dNhahrJx1r2XIT3ge4ZQvJM86mdjNA+W5phqRQh7DwCg==",
"dev": true,
"requires": {
"@ungap/promise-all-settled": "1.1.2",
{
"name": "websub-hub",
- "version": "1.1.5",
+ "version": "1.2.0",
"description": "A WebSub Hub server implementation.",
"main": "server.js",
"scripts": {
"axios": "^0.21.1",
"better-sqlite3": "^7.4.3",
"feedparser": "^2.2.10",
- "htmlparser2": "^6.1.0",
+ "htmlparser2": "^7.0.0",
"iconv": "^3.0.0",
"pg-promise": "^10.11.0"
},
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-security": "^1.4.0",
"eslint-plugin-sonarjs": "^0.10.0",
- "mocha": "^9.0.3",
+ "mocha": "^9.1.0",
"mocha-steps": "^1.3.0",
"nyc": "^15.1.0",
"pre-commit": "^1.2.2",
.filter((h) => h.match(/^sha[0-9]+$/))
.includes(algorithm);
+
+/**
+ * Return an array containing x if x is not an array.
+ * @param {*} x
+ */
+const ensureArray = (x) => {
+ if (x === undefined) {
+ return [];
+ }
+ if (!Array.isArray(x)) {
+ return Array(x);
+ }
+ return x;
+};
+
+
/**
* Recursively freeze an object.
* @param {Object} o
arrayChunk,
attemptRetrySeconds,
axiosResponseLogData,
+ ensureArray,
freezeDeep,
logTruncate,
randomBytesAsync,
}
if (!topic.isActive) {
+ // These should be filtered out when selecting verification tasks to process.
this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
await this.db.verificationRelease(dbCtx, verificationId);
return;
case Enum.Mode.Unsubscribe:
if (verificationAccepted) {
await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+ if (topic.isDeleted) {
+ // Remove a deleted topic after the last subscription is notified.
+ await this.db.topicPendingDelete(txCtx, topic.id);
+ }
}
break;
case Enum.Mode.Denied:
await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+ if (topic.isDeleted) {
+ // Remove a deleted topic after he last subscription is notified.
+ await this.db.topicPendingDelete(txCtx, topic.id);
+ }
break;
default:
throw new Errors.InternalInconsistencyError('no such topic id');
}
+ // Cull any expired subscriptions
+ await this.db.subscriptionDeleteExpired(dbCtx, topicId);
+
logInfoData.url = topicId.url;
if (topic.isDeleted) {
const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
if (!validHub) {
- this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData });
+ this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
if (this.options.communication.strictTopicHubLink) {
await this.db.transaction(dbCtx, async (txCtx) => {
// Set as deleted and set content_updated so subscriptions are notified.
await this.db.topicDeleted(txCtx, topicId);
await this.db.topicFetchComplete(txCtx, topicId);
});
+ // Attempt to remove from db, if no active subscriptions.
+ await this.db.topicPendingDelete(dbCtx, topicId);
return;
}
}
}
+ /**
+ * Remove any expired subscriptions to a topic.
+ * @param {*} dbCtx
+ * @param {*} topicId
+ */
+ async subscriptionDeleteExpired(dbCtx, topicId) {
+ this._notImplemented('subscriptionDeleteExpired', arguments);
+ }
+
+
/**
* Claim subscriptions needing content updates attempted.
* @param {*} dbCtx
this._notImplemented('topicGetAll', arguments);
}
+
/**
* Get topic data, without content.
* @param {*} dbCtx
this._notImplemented('topicGetContentById', arguments);
}
- // /**
- // * Call after an unsubscribe, to check if a topic is awaiting deletion, and that
- // * was the last subscription belaying it.
- // * @param {String|Integer} data topic url or id
- // */
- // async topicPendingDelete(dbCtx, data) {
- // this._notImplemented('topicPendingDelete', arguments);
- // }
+
+ /**
+ * Attempt to delete a topic, which must be set isDeleted, if there
+ * are no more subscriptions belaying its removal.
+ * @param {*} topicId
+ */
+ async topicPendingDelete(dbCtx, topicId) {
+ this._notImplemented('topicPendingDelete', arguments);
+ }
/**
}
+ async subscriptionDeleteExpired(dbCtx, topicId) {
+ const _scope = _fileScope('subscriptionDeleteExpired');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
+ this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
+ return this._engineInfo(result);
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
const _scope = _fileScope('subscriptionDeliveryClaim');
this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
}
+ async topicPendingDelete(dbCtx, topicId) {
+ const _scope = _fileScope('topicPendingDelete');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ await dbCtx.txIf(async (txCtx) => {
+ const topic = await txCtx.one(this.statement.topicGetById, { topicId });
+ if (!topic.isDeleted) {
+ this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+ return;
+ }
+
+ const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
+ if (subscriberCount) {
+ this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+ return;
+ }
+
+ const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
+ if (result.rowCount !== 1) {
+ throw new DBErrors.UnexpectedResult('did not delete topic');
+ }
+ });
+ this.logger.debug(_scope, 'success', { topicId });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
async topicSet(dbCtx, data) {
const _scope = _fileScope('topicSet');
this.logger.debug(_scope, 'called', data);
--- /dev/null
+--
+DELETE FROM subscription
+WHERE topic_id = $(topicId) AND expires < now()
+
--- /dev/null
+--
+DELETE FROM topic
+WHERE id = $(topicId)
+
}
+ subscriptionDeleteExpired(dbCtx, topicId) {
+ const _scope = _fileScope('subscriptionDeleteExpired');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ const result = this.statement.subscriptionDeleteExpired.run({ topicId });
+ this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
+ return this._engineInfo(result);
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
const _scope = _fileScope('subscriptionDeliveryClaim');
this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
}
+ topicPendingDelete(dbCtx, topicId) {
+ const _scope = _fileScope('topicPendingDelete');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ this.db.transaction(() => {
+ const topic = this.statement.topicGetById.get({ topicId });
+ if (!topic.isDeleted) {
+ this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+ return;
+ }
+
+ const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
+ if (subscriberCount) {
+ this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+ return;
+ }
+
+ const result = this.statement.topicDeleteById.run({ topicId });
+ if (result.changes !== 1) {
+ throw new DBErrors.UnexpectedResult('did not delete topic');
+ }
+ })();
+ this.logger.debug(_scope, 'success', { topicId });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
topicSet(dbCtx, data) {
const _scope = _fileScope('topicSet');
this.logger.debug(_scope, 'called', data);
--- /dev/null
+--
+DELETE FROM subscription
+WHERE topic_id = :topicId AND expires < strftime('%s', 'now')
--- /dev/null
+--
+DELETE FROM topic
+WHERE id = :topicId
+
if (data.topic) {
topic = await this.db.topicGetByUrl(dbCtx, data.topic);
- if (!topic && this.options.manager.publicHub) {
+ if (!topic && this._newTopicCreationAllowed()) {
this.logger.info(_scope, 'new topic from subscribe request', { data, requestId });
try {
/**
- * Check that a publish request topic is valid and exists,
- * and if it is, add topicId to data.
- * For a public publish request, create topic if not exists.
+ * Determine if a topic url is allowed to be created.
+ * In the future, this may be more complicated.
+ * @returns {Boolean}
+ */
+ _newTopicCreationAllowed() {
+ return this.options.manager.publicHub;
+ }
+
+
+ /**
+ * Check that a publish request's topic(s) are valid and exist,
+ * returning an array with the results for each.
+ * For a public-hub publish request, creates topics if they do not exist.
* @param {*} dbCtx
* @param {RootData} data
* @param {String[]} warn
* @param {String[]} err
* @param {String} requestId
*/
- async _checkPublish(dbCtx, data, warn, err, requestId) {
+ async _publishTopics(dbCtx, data, requestId) {
const _scope = _fileScope('_checkPublish');
- const publishUrl = data.url || data.topic;
+ // Publish requests may include multiple topics, consider them all, but deduplicate.
+ const publishUrls = Array.from(new Set([
+ ...common.ensureArray(data.url),
+ ...common.ensureArray(data.topic),
+ ]));
+
+ // Map the requested topics to their ids, creating if necessary.
+ return Promise.all(publishUrls.map(async (url) => {
+ const result = {
+ url,
+ warn: [],
+ err: [],
+ topicId: undefined,
+ };
+ let topic = await this.db.topicGetByUrl(dbCtx, url);
+ if (!topic && this._newTopicCreationAllowed()) {
+ try {
+ new URL(url);
+ } catch (e) {
+ result.err.push('invalid topic url (failed to parse url)');
+ return result;
+ }
+ await this.db.topicSet(dbCtx, {
+ // TODO: accept a publisherValidationUrl parameter
+ url,
+ });
+ topic = await this.db.topicGetByUrl(dbCtx, url);
+ this.logger.info(_scope, 'new topic from publish request', { url, requestId });
+ }
+ if (!topic || topic.isDeleted) {
+ result.err.push('topic not supported');
+ return result;
+ }
+ result.topicId = topic.id;
+ return result;
+ }));
+ }
- let topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
- if (!topic && this.options.manager.publicHub) {
- this.logger.info(_scope, 'new topic from publish request', { data, requestId });
- try {
- new URL(publishUrl);
- } catch (e) {
- err.push('invalid topic url (failed to parse url)');
- return;
+ /**
+ * Render response for multi-topic publish requests.
+ * @param {Object[]} publishTopics
+ */
+ static multiPublishContent(ctx, publishTopics) {
+ const responses = publishTopics.map((topic) => ({
+ href: topic.url,
+ status: topic.status,
+ statusMessage: topic.statusMessage,
+ errors: topic.err,
+ warnings: topic.warn,
+ }));
+ switch (ctx.responseType) {
+ case Enum.ContentType.ApplicationJson:
+ return JSON.stringify(responses);
+
+ case Enum.ContentType.TextPlain:
+ default: {
+ const textResponses = responses.map((response) => {
+ const details = Manager._prettyDetails(response.errors, response.warnings);
+ const textDetails = (details.length ? '\n' : '') + details.map((d) => `\t${d}`).join('\n');
+ return `${response.href} [${response.status} ${response.statusMessage}]${textDetails}`;
+ });
+ return textResponses.join('\n----\n');
}
+ }
+ }
+
- await this.db.topicSet(dbCtx, {
- url: publishUrl,
- });
- topic = await this.db.topicGetByUrl(dbCtx, publishUrl);
+ /**
+ * Process a publish request.
+ * @param {*} dbCtx
+ * @param {Object} data
+ * @param {http.ServerResponse} res
+ * @param {Object} ctx
+ */
+ async _publishRequest(dbCtx, data, res, ctx) {
+ const _scope = _fileScope('_parsePublish');
+ this.logger.debug(_scope, 'called', { data });
+
+ const requestId = ctx.requestId;
+
+ // Parse and validate all the topics in the request.
+ data.publishTopics = await this._publishTopics(dbCtx, data, requestId);
+ if (!data.publishTopics || !data.publishTopics.length) {
+ const details = Manager._prettyDetails(['no valid topic urls to publish'], []);
+ throw new ResponseError(Enum.ErrorResponse.BadRequest, details);
}
- if (!topic || topic.isDeleted) {
- err.push('not a supported topic');
- return;
+ // Set status per topic
+ for (const topicResult of data.publishTopics) {
+ topicResult.status = topicResult.err.length ? 400 : 202;
+ topicResult.statusMessage = topicResult.err.length ? 'Bad Request' : 'Accepted';
}
- data.topicId = topic.id;
+ // Process the valid publish notifications
+ const validPublishTopics = data.publishTopics.filter((topicResult) => !topicResult.err.length);
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.db.topicFetchRequested(dbCtx, topicResult.topicId)));
+ } catch (e) {
+ this.logger.error(_scope, 'topicFetchRequest failed', { error: e, ctx, data, requestId });
+ throw e;
+ }
+
+ this.logger.info(_scope, 'request accepted', { ctx, data, requestId });
+
+ if (data.publishTopics.length === 1) {
+ const soleTopic = data.publishTopics[0];
+ res.statusCode = soleTopic.status;
+ res.end(Manager._prettyDetails(soleTopic.err, soleTopic.warn).join('\n'));
+ } else {
+ res.statusCode = 207;
+ res.end(Manager.multiPublishContent(ctx, data.publishTopics));
+ }
+
+ if (this.options.manager.processImmediately
+ && validPublishTopics.length) {
+ try {
+ await Promise.all(validPublishTopics.map(async (topicResult) => this.communication.topicFetchClaimAndProcessById(dbCtx, topicResult.topicId, requestId)));
+ } catch (e) {
+ this.logger.error(_scope, 'topicFetchClaimAndProcessById failed', { data, validPublishTopics, requestId });
+ // Don't bother re-throwing, as we've already ended this response.
+ }
+ }
+ }
+
+
+ /**
+ * Annotate any encountered issues.
+ * @param {String[]} err
+ * @param {String[]} warn
+ * @returns {String[]}
+ */
+ static _prettyDetails(err, warn) {
+ return [
+ ...err.map((entry) => `error: ${entry}`),
+ ...warn.map((entry) => `warning: ${entry}`),
+ ];
}
await this.db.context(async (dbCtx) => {
+ // Handle publish requests elsewhere
if (data.mode === Enum.Mode.Publish) {
- await this._checkPublish(dbCtx, data, warn, err, requestId);
- } else {
- await this._validateRootData(dbCtx, data, warn, err, requestId);
+ return this._publishRequest(dbCtx, data, res, ctx);
}
- const prettyErr = err.map((entry) => `error: ${entry}`);
- const prettyWarn = warn.map((entry) => `warning: ${entry}`);
- const details = prettyErr.concat(prettyWarn);
+ await this._validateRootData(dbCtx, data, warn, err, requestId);
+
+ const details = Manager._prettyDetails(err, warn);
// Any errors are fatal. Stop and report anything that went wrong.
if (err.length) {
}
// Commit the request for later processing.
- let fn, info, id;
+ let id;
try {
- if (data.mode === Enum.Mode.Publish) {
- fn = 'topicFetchRequested';
- info = await this.db.topicFetchRequested(dbCtx, data.topicId);
- id = data.topicId;
- } else {
- fn = 'verificationInsert';
- id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
- }
+ id = await this.db.verificationInsert(dbCtx, { ...data, requestId });
} catch (e) {
- this.logger.error(_scope, `${fn} failed`, { e, info, data, warn, id, requestId });
+ this.logger.error(_scope, 'verificationInsert failed', { error: e, data, warn, id, requestId });
throw e;
}
if (this.options.manager.processImmediately
&& id) {
try {
- if (data.mode === Enum.Mode.Publish) {
- fn = 'topicFetchClaimAndProcessById';
- await this.communication.topicFetchClaimAndProcessById(dbCtx, id, requestId);
- } else {
- fn = 'verificationClaimAndProcessById';
- await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
- }
+ await this.communication.verificationClaimAndProcessById(dbCtx, id, requestId);
} catch (e) {
- this.logger.error(_scope, `${fn} failed`, { ...data, id, requestId });
+ this.logger.error(_scope, 'verificationClaimAndProcessById failed', { ...data, id, requestId });
// Don't bother re-throwing, as we've already ended this response.
}
}
await this.db.topicDeleted(txCtx, topicId);
res.end();
this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
+ // Attempt to remove from db if no active subscriptions.
+ await this.db.topicPendingDelete(txCtx, topicId);
return;
}
});
}); // validHash
+ describe('ensureArray', function () {
+ it('returns empty array for no data', function () {
+ const result = common.ensureArray();
+ assert.deepStrictEqual(result, []);
+ });
+ it('returns same array passed in', function () {
+ const expected = [1, 2, 3, 'foo'];
+ const result = common.ensureArray(expected);
+ assert.deepStrictEqual(result, expected);
+ });
+ it('returns array containing non-array data', function () {
+ const data = 'bar';
+ const result = common.ensureArray(data);
+ assert.deepStrictEqual(result, [data]);
+ });
+ }); // ensureArray
+
}); // Common
assert(communication.db.verificationComplete.called);
});
+ it('unsubscription from deleted topic deletes topic', async function () {
+ communication.db.verificationGetById.restore();
+ verification.mode = 'unsubscribe';
+ sinon.stub(communication.db, 'verificationGetById').resolves(verification);
+ communication.db.topicGetById.restore();
+ sinon.stub(communication.db, 'topicGetById').resolves({
+ ...topic,
+ isDeleted: true,
+ });
+
+ await communication.verificationProcess(dbCtx, callback, topicId, requestId);
+
+ assert(communication.db.subscriptionDelete.called);
+ assert(communication.db.verificationComplete.called);
+ assert(communication.db.topicPendingDelete.called);
+ });
+
it('unsubscription denial succeeds', async function () {
communication.db.verificationGetById.restore();
verification.mode = 'unsubscribe';
}); // Authentication
describe('Topic', function () {
+ let anotherTopicId;
step('requires data', async function () {
try {
await db.context(async (dbCtx) => {
step('deletes a topic', async function () {
await db.context(async (dbCtx) => {
const result = await db.topicSet(dbCtx, testData.anotherTopicSet);
- const anotherTopicId = result.lastInsertRowid;
+ anotherTopicId = result.lastInsertRowid;
await db.topicDeleted(dbCtx, anotherTopicId);
const topic = await db.topicGetById(dbCtx, anotherTopicId);
assert.strictEqual(topic.isDeleted, true);
step('update un-deletes a topic', async function () {
await db.context(async (dbCtx) => {
const result = await db.topicSet(dbCtx, testData.anotherTopicSet);
- const anotherTopicId = result.lastInsertRowid;
+ assert.strictEqual(result.lastInsertRowid, anotherTopicId);
const topic = await db.topicGetById(dbCtx, anotherTopicId);
assert.strictEqual(topic.isDeleted, false);
});
assert(topics.length);
});
});
+ // pending delete of deleted topic with no subscriptions
+ step('really deletes unsubscribed deleted topic', async function() {
+ await db.context(async (dbCtx) => {
+ await db.topicDeleted(dbCtx, anotherTopicId);
+ await db.topicPendingDelete(dbCtx, anotherTopicId);
+ const topic = await db.topicGetById(dbCtx, anotherTopicId);
+ assert(!topic);
+ });
+ });
}); // Topic
describe('Subscription', function () {
assert(!subscription);
});
});
+ step('create expired subscription', async function () {
+ const data = {
+ ...testData.subscriptionUpsert,
+ secret: 'newSecret',
+ topicId,
+ leaseSeconds: -1,
+ };
+ await db.context(async (dbCtx) => {
+ const result = await db.subscriptionUpsert(dbCtx, data);
+ assert(result.lastInsertRowid);
+ assert.notStrictEqual(result.lastInsertRowid, subscriptionId);
+ subscriptionId = result.lastInsertRowid;
+ assert.strictEqual(result.changes, 1);
+ });
+ });
+ step('delete expired subscriptions', async function() {
+ await db.context(async (dbCtx) => {
+ await db.subscriptionDeleteExpired(dbCtx, topicId)
+ const subscription = await db.subscriptionGet(dbCtx, testData.subscriptionUpsert.callback, topicId);
+ assert(!subscription);
+ });
+ });
}); // Subscription
describe('Verification', function () {
changes: 1,
lastInsertRowid: undefined,
duration: 10,
- }
+ };
sinon.stub(db.db, 'result').resolves(dbResult);
const result = await db.subscriptionDelete(dbCtx, callback, topicId);
assert.deepStrictEqual(result, expected);
});
}); // subscriptionDelete
+ describe('subscriptionDeleteExpired', function () {
+ it('success', async function () {
+ const dbResult = {
+ rowCount: 1,
+ rows: [],
+ duration: 10,
+ };
+ const expected = {
+ changes: 1,
+ lastInsertRowid: undefined,
+ duration: 10,
+ };
+ sinon.stub(db.db, 'result').resolves(dbResult);
+ const result = await db.subscriptionDeleteExpired(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ });
+ it('failure', async function() {
+ const expected = new Error();
+ sinon.stub(db.db, 'result').rejects(expected);
+ try {
+ await db.subscriptionDeleteExpired(dbCtx, topicId);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ });
+
describe('subscriptionDeliveryClaim', function () {
it('success', async function() {
const dbResult = [
});
}); // topicGetContentById
+ describe('topicPendingDelete', function () {
+ beforeEach(function () {
+ sinon.stub(db.db, 'one');
+ sinon.stub(db.db, 'result');
+ });
+ it('success', async function () {
+ db.db.one.onCall(0).resolves({
+ id: topicId,
+ isDeleted: true,
+ }).onCall(1).resolves({
+ count: 0,
+ });
+ const dbResult = {
+ rowCount: 1,
+ rows: [],
+ duration: 10,
+ };
+ db.db.result.resolves(dbResult);
+ await db.topicPendingDelete(dbCtx, topicId);
+ assert(db.db.result.called);
+ });
+ it('does not delete non-deleted topic', async function () {
+ db.db.one.onCall(0).resolves({
+ id: topicId,
+ isDeleted: false,
+ }).onCall(1).resolves({
+ count: 0,
+ });
+ await db.topicPendingDelete(dbCtx, topicId);
+ assert(!db.db.result.called);
+ });
+ it('does not delete topic with active subscriptions', async function () {
+ db.db.one.onCall(0).resolves({
+ id: topicId,
+ isDeleted: true,
+ }).onCall(1).resolves({
+ count: 10,
+ });
+ await db.topicPendingDelete(dbCtx, topicId);
+ assert(!db.db.result.called);
+ });
+ it('covers no deletion', async function () {
+ db.db.one.onCall(0).resolves({
+ id: topicId,
+ isDeleted: true,
+ }).onCall(1).resolves({
+ count: 0,
+ });
+ const dbResult = {
+ rowCount: 0,
+ rows: [],
+ duration: 10,
+ };
+ db.db.result.resolves(dbResult);
+ try {
+ await db.topicPendingDelete(dbCtx, topicId);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert(e instanceof DBErrors.UnexpectedResult);
+ }
+ });
+ });
+
describe('topicSet', function () {
let data;
beforeEach(function () {
});
}); // subscriptionDelete
+ describe('subscriptionDeleteExpired', function () {
+ it('success', async function () {
+ const dbResult = {
+ changes: 1,
+ lastInsertRowid: undefined,
+ };
+ const expected = {
+ changes: 1,
+ lastInsertRowid: undefined,
+ };
+ sinon.stub(db.statement.subscriptionDeleteExpired, 'run').returns(dbResult);
+ const result = await db.subscriptionDeleteExpired(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ });
+ it('failure', async function () {
+ const expected = new Error();
+ sinon.stub(db.statement.subscriptionDeleteExpired, 'run').throws(expected);
+ try {
+ await db.subscriptionDeleteExpired(dbCtx, topicId);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ });
+
describe('subscriptionDeliveryClaim', function () {
- it('success', async function() {
+ it('success', async function () {
const dbAllResult = [
{
id: 'c2e254c5-aa6e-4a8f-b1a1-e474b07392bb',
});
}); // topicGetContentById
+ describe('topicPendingDelete', function () {
+ beforeEach(function () {
+ sinon.stub(db.statement.topicGetById, 'get');
+ sinon.stub(db.statement.subscriptionCountByTopicUrl, 'get');
+ sinon.stub(db.statement.topicDeleteById, 'run');
+ });
+ it('success', async function () {
+ db.statement.topicGetById.get.returns({
+ id: topicId,
+ isDeleted: true,
+ });
+ db.statement.subscriptionCountByTopicUrl.get.returns({
+ count: 0,
+ });
+ db.statement.topicDeleteById.run.returns({
+ changes: 1,
+ });
+ db.topicPendingDelete(dbCtx, topicId);
+ assert(db.statement.topicDeleteById.run.called);
+ });
+ it('does not delete non-deleted topic', async function () {
+ db.statement.topicGetById.get.returns({
+ id: topicId,
+ isDeleted: false,
+ });
+ db.statement.subscriptionCountByTopicUrl.get.returns({
+ count: 0,
+ });
+ db.statement.topicDeleteById.run.returns({
+ changes: 1,
+ });
+ db.topicPendingDelete(dbCtx, topicId);
+ assert(!db.statement.topicDeleteById.run.called);
+ });
+ it('does not delete topic with active subscriptions', async function () {
+ db.statement.topicGetById.get.returns({
+ id: topicId,
+ isDeleted: true,
+ });
+ db.statement.subscriptionCountByTopicUrl.get.returns({
+ count: 10,
+ });
+ db.statement.topicDeleteById.run.returns({
+ changes: 1,
+ });
+ db.topicPendingDelete(dbCtx, topicId);
+ assert(!db.statement.topicDeleteById.run.called);
+ });
+ it('covers no deletion', async function () {
+ db.statement.topicGetById.get.returns({
+ id: topicId,
+ isDeleted: true,
+ });
+ db.statement.subscriptionCountByTopicUrl.get.returns({
+ count: 0,
+ });
+ db.statement.topicDeleteById.run.returns({
+ changes: 0,
+ });
+ try {
+ db.topicPendingDelete(dbCtx, topicId);
+ assert.fail(noExpectedException);
+
+ } catch (e) {
+ assert(e instanceof DBErrors.UnexpectedResult);
+ }
+ assert(db.statement.topicDeleteById.run.called);
+ });
+ });
+
describe('topicSet', function () {
let data;
beforeEach(function () {
manager = new Manager(stubLogger, stubDb, options);
sinon.stub(manager.communication, 'verificationProcess');
sinon.stub(manager.communication, 'topicFetchProcess');
+ sinon.stub(manager.communication, 'topicFetchClaimAndProcessById');
stubDb._reset();
stubLogger._reset();
});
await manager.getAdminOverview(res, ctx);
assert(res.end.called);
});
- });
+ }); // getAdminOverview
describe('getTopicDetails', function () {
it('covers', async function() {
});
}); // _checkMode
- describe('_checkPublish', function () {
- let dbCtx, data, warn, err, requestId;
+ describe('_publishTopics', function () {
+ let dbCtx, data, requestId;
beforeEach(function () {
dbCtx = {};
data = {};
- warn = [];
- err = [];
requestId = 'blah';
});
it('succeeds', async function () {
id: 222,
});
Object.assign(data, testData.validPublishRootData);
- await manager._checkPublish(dbCtx, data, warn, err, requestId);
- assert.strictEqual(warn.length, 0, 'unexpected warnings length');
- assert.strictEqual(err.length, 0, 'unexpected errors length');
- assert.strictEqual(data.topicId, 222, 'unexpected topic id');
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 1);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
});
it('fails bad url', async function () {
Object.assign(data, testData.validPublishRootData, { topic: 'not_a_url' });
- await manager._checkPublish(dbCtx, data, warn, err, requestId);
- assert.strictEqual(err.length, 1, 'unexpected errors length');
- assert.strictEqual(warn.length, 0);
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 1);
+ assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].warn.length, 0);
});
it('accepts new public publish topic', async function () {
manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
id: 222,
});
Object.assign(data, testData.validPublishRootData);
- await manager._checkPublish(dbCtx, data, warn, err, requestId);
- assert.strictEqual(warn.length, 0, 'unexpected warnings length');
- assert.strictEqual(err.length, 0, 'unexpected errors length');
- assert.strictEqual(data.topicId, 222, 'unexpected topic id');
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 1);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
});
it('does not publish deleted topic', async function () {
manager.db.topicGetByUrl.resolves({
isDeleted: true,
});
Object.assign(data, testData.validPublishRootData);
- await manager._checkPublish(dbCtx, data, warn, err, requestId);
- assert.strictEqual(warn.length, 0, 'unexpected warnings length');
- assert.strictEqual(err.length, 1, 'unexpected errors length');
- assert.strictEqual(data.topicId, undefined, 'unexpected topic id');
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 1);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 1, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, undefined, 'unexpected topic id');
+ });
+ it('no topics', async function() {
+ Object.assign(data, testData.validPublishRootData);
+ delete data.topic;
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 0);
+ });
+ it('multiple valid topics', async function () {
+ manager.db.topicGetByUrl.resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ data.url = ['https://example.com/first', 'https://example.com/second'];
+ data.topic = ['https://example.com/third'];
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 3);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
+ assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[1].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[1].topicId, 222, 'unexpected topic id');
+ assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id');
+ });
+ it('mix of valid and invalid topics', async function () {
+ manager.db.topicGetByUrl.onCall(1).resolves().resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ data.url = ['https://example.com/first', 'not a url'];
+ data.topic = ['https://example.com/third'];
+ const topicResults = await manager._publishTopics(dbCtx, data, requestId);
+ assert.strictEqual(topicResults.length, 3);
+ assert.strictEqual(topicResults[0].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[0].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[0].topicId, 222, 'unexpected topic id');
+ assert.strictEqual(topicResults[1].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[1].err.length, 1, 'unexpected errors length');
+ assert.strictEqual(topicResults[1].topicId, undefined, 'unexpected topic id');
+ assert.strictEqual(topicResults[2].warn.length, 0, 'unexpected warnings length');
+ assert.strictEqual(topicResults[2].err.length, 0, 'unexpected errors length');
+ assert.strictEqual(topicResults[2].topicId, 222, 'unexpected topic id');
+ });
+ }); // _publishTopics
+
+ describe('_publishRequest', function () {
+ let dbCtx, data, res, ctx;
+ beforeEach(function () {
+ dbCtx = {};
+ data = {};
+ res = {
+ end: sinon.stub(),
+ };
+ ctx = {};
+ });
+ it('requires a topic', async function () {
+ try {
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert(e instanceof Errors.ResponseError);
+ }
+ });
+ it('processes one topic', async function() {
+ manager.db.topicGetByUrl.resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ manager.db.topicFetchRequested.resolves();
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert(manager.db.topicFetchRequested.called);
+ assert.strictEqual(res.statusCode, 202);
+ assert(res.end.called);
+ });
+ it('processes mix of valid and invalid topics', async function () {
+ ctx.responseType = 'application/json';
+ manager.db.topicGetByUrl.onCall(1).resolves().resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ data.url = ['https://example.com/first', 'not a url'];
+ data.topic = ['https://example.com/third'];
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert.strictEqual(res.statusCode, 207);
+ assert(res.end.called);
+ });
+ it('covers topicFetchRequest failure', async function () {
+ manager.db.topicGetByUrl.resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ const expected = new Error('boo');
+ manager.db.topicFetchRequested.rejects(expected);
+ try {
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
});
- }); // _checkPublish
+ it('covers immediate processing error', async function() {
+ manager.options.manager.processImmediately = true;
+ manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
+ id: 222,
+ });
+ manager.communication.topicFetchClaimAndProcessById.rejects();
+ Object.assign(data, testData.validPublishRootData);
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert(manager.db.topicFetchRequested.called);
+ assert.strictEqual(res.statusCode, 202);
+ assert(res.end.called);
+ assert(manager.communication.topicFetchClaimAndProcessById.called)
+ });
+ it('covers no immediate processing', async function() {
+ manager.options.manager.processImmediately = false;
+ manager.db.topicGetByUrl.onCall(0).resolves().onCall(1).resolves({
+ id: 222,
+ });
+ Object.assign(data, testData.validPublishRootData);
+ await manager._publishRequest(dbCtx, data, res, ctx);
+ assert(manager.db.topicFetchRequested.called);
+ assert.strictEqual(res.statusCode, 202);
+ assert(res.end.called);
+ assert(!manager.communication.topicFetchClaimAndProcessById.called)
+ });
+ }); // _publishRequest
+
+ describe('multiPublishContent', function () {
+ let publishTopics;
+ beforeEach(function () {
+ publishTopics = [{
+ url: 'https://example.com/first',
+ warn: [],
+ err: [],
+ topicId: 222,
+ status: 202,
+ statusMessage: 'Accepted',
+ },
+ {
+ url: 'not a url',
+ warn: [],
+ err: [ 'invalid topic url (failed to parse url)' ],
+ topicId: undefined,
+ status: 400,
+ statusMessage: 'Bad Request',
+ }];
+ });
+ it('covers json response', function () {
+ ctx.responseType = 'application/json';
+ const expected = '[{"href":"https://example.com/first","status":202,"statusMessage":"Accepted","errors":[],"warnings":[]},{"href":"not a url","status":400,"statusMessage":"Bad Request","errors":["invalid topic url (failed to parse url)"],"warnings":[]}]';
+ const result = Manager.multiPublishContent(ctx, publishTopics);
+ assert.deepStrictEqual(result, expected);
+ });
+ it('covers text response', function () {
+ ctx.responseType = 'text/plain';
+ const expected = `https://example.com/first [202 Accepted]
+----
+not a url [400 Bad Request]
+\terror: invalid topic url (failed to parse url)`;
+ const result = Manager.multiPublishContent(ctx, publishTopics);
+ assert.deepStrictEqual(result, expected);
+ });
+ }); // multiPublishContent
describe('processTasks', function () {
it('covers', async function () {
'subscriptionsByTopicId',
'subscriptionCountByTopicUrl',
'subscriptionDelete',
+ 'subscriptionDeleteExpired',
'subscriptionDeliveryClaim',
'subscriptionDeliveryClaimById',
'subscriptionDeliveryComplete',
'topicGetById',
'topicGetByUrl',
'topicGetContentById',
+ 'topicPendingDelete',
'topicSet',
'topicSetContent',
'topicUpdate',