From: Justin Wind Date: Sat, 21 Aug 2021 21:51:47 +0000 (-0700) Subject: expired subscriptions and deleted topics with no subscribers are now removed from... X-Git-Tag: v1.2.0^2~2 X-Git-Url: https://git.squeep.com/?a=commitdiff_plain;h=9812213260e952ae601f94ab0915c680e8c80495;p=websub-hub expired subscriptions and deleted topics with no subscribers are now removed from the database Expired subscriptions are removed when a topic is updated, and topics set to deleted state are removed when the last subscriber is notified. --- diff --git a/CHANGELOG.md b/CHANGELOG.md index a8521de..c14c493 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ Releases and notable changes to this project are documented here. ### Added - Accept multiple topics in publish requests. +- Expired subscription entries are removed from the database when their topics are updated. +- Topics which have been marked deleted are removed from the database after all subscribers have been notified. ## [v1.1.5] - 2021-08-23 diff --git a/src/communication.js b/src/communication.js index 345714f..097da63 100644 --- a/src/communication.js +++ b/src/communication.js @@ -223,6 +223,7 @@ class Communication { } if (!topic.isActive) { + // These should be filtered out when selecting verification tasks to process. this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId }); await this.db.verificationRelease(dbCtx, verificationId); return; @@ -328,11 +329,19 @@ class Communication { case Enum.Mode.Unsubscribe: if (verificationAccepted) { await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId); + if (topic.isDeleted) { + // Remove a deleted topic after the last subscription is notified. + await this.db.topicPendingDelete(txCtx, topic.id); + } } break; case Enum.Mode.Denied: await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId); + if (topic.isDeleted) { + // Remove a deleted topic after he last subscription is notified. + await this.db.topicPendingDelete(txCtx, topic.id); + } break; default: @@ -431,6 +440,9 @@ class Communication { throw new Errors.InternalInconsistencyError('no such topic id'); } + // Cull any expired subscriptions + await this.db.subscriptionDeleteExpired(dbCtx, topicId); + logInfoData.url = topicId.url; if (topic.isDeleted) { @@ -479,13 +491,15 @@ class Communication { const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data); if (!validHub) { - this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData }); + this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData }); if (this.options.communication.strictTopicHubLink) { await this.db.transaction(dbCtx, async (txCtx) => { // Set as deleted and set content_updated so subscriptions are notified. await this.db.topicDeleted(txCtx, topicId); await this.db.topicFetchComplete(txCtx, topicId); }); + // Attempt to remove from db, if no active subscriptions. + await this.db.topicPendingDelete(dbCtx, topicId); return; } } diff --git a/src/db/base.js b/src/db/base.js index 95c9010..de0cd44 100644 --- a/src/db/base.js +++ b/src/db/base.js @@ -351,6 +351,16 @@ class Database { } + /** + * Remove any expired subscriptions to a topic. + * @param {*} dbCtx + * @param {*} topicId + */ + async subscriptionDeleteExpired(dbCtx, topicId) { + this._notImplemented('subscriptionDeleteExpired', arguments); + } + + /** * Claim subscriptions needing content updates attempted. * @param {*} dbCtx @@ -533,6 +543,7 @@ class Database { this._notImplemented('topicGetAll', arguments); } + /** * Get topic data, without content. * @param {*} dbCtx @@ -563,14 +574,15 @@ class Database { this._notImplemented('topicGetContentById', arguments); } - // /** - // * Call after an unsubscribe, to check if a topic is awaiting deletion, and that - // * was the last subscription belaying it. - // * @param {String|Integer} data topic url or id - // */ - // async topicPendingDelete(dbCtx, data) { - // this._notImplemented('topicPendingDelete', arguments); - // } + + /** + * Attempt to delete a topic, which must be set isDeleted, if there + * are no more subscriptions belaying its removal. + * @param {*} topicId + */ + async topicPendingDelete(dbCtx, topicId) { + this._notImplemented('topicPendingDelete', arguments); + } /** diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index 2559097..213fa10 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -421,6 +421,21 @@ class DatabasePostgres extends Database { } + async subscriptionDeleteExpired(dbCtx, topicId) { + const _scope = _fileScope('subscriptionDeleteExpired'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId }); + this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount }); + return this._engineInfo(result); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) { const _scope = _fileScope('subscriptionDeliveryClaim'); this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant }); @@ -804,6 +819,37 @@ class DatabasePostgres extends Database { } + async topicPendingDelete(dbCtx, topicId) { + const _scope = _fileScope('topicPendingDelete'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + await dbCtx.txIf(async (txCtx) => { + const topic = await txCtx.one(this.statement.topicGetById, { topicId }); + if (!topic.isDeleted) { + this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId }); + return; + } + + const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url }); + if (subscriberCount) { + this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount }); + return; + } + + const result = await txCtx.result(this.statement.topicDeleteById, { topicId }); + if (result.rowCount !== 1) { + throw new DBErrors.UnexpectedResult('did not delete topic'); + } + }); + this.logger.debug(_scope, 'success', { topicId }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + async topicSet(dbCtx, data) { const _scope = _fileScope('topicSet'); this.logger.debug(_scope, 'called', data); diff --git a/src/db/postgres/sql/subscription-delete-expired.sql b/src/db/postgres/sql/subscription-delete-expired.sql new file mode 100644 index 0000000..d0f96e7 --- /dev/null +++ b/src/db/postgres/sql/subscription-delete-expired.sql @@ -0,0 +1,4 @@ +-- +DELETE FROM subscription +WHERE topic_id = $(topicId) AND expires < now() + diff --git a/src/db/postgres/sql/topic-delete-by-id.sql b/src/db/postgres/sql/topic-delete-by-id.sql new file mode 100644 index 0000000..1378016 --- /dev/null +++ b/src/db/postgres/sql/topic-delete-by-id.sql @@ -0,0 +1,4 @@ +-- +DELETE FROM topic +WHERE id = $(topicId) + diff --git a/src/db/sqlite/index.js b/src/db/sqlite/index.js index fba4e7c..07d6633 100644 --- a/src/db/sqlite/index.js +++ b/src/db/sqlite/index.js @@ -359,6 +359,21 @@ class DatabaseSQLite extends Database { } + subscriptionDeleteExpired(dbCtx, topicId) { + const _scope = _fileScope('subscriptionDeleteExpired'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + const result = this.statement.subscriptionDeleteExpired.run({ topicId }); + this.logger.debug(_scope, 'success', { topicId, deleted: result.changes }); + return this._engineInfo(result); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) { const _scope = _fileScope('subscriptionDeliveryClaim'); this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant }); @@ -768,6 +783,37 @@ class DatabaseSQLite extends Database { } + topicPendingDelete(dbCtx, topicId) { + const _scope = _fileScope('topicPendingDelete'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + this.db.transaction(() => { + const topic = this.statement.topicGetById.get({ topicId }); + if (!topic.isDeleted) { + this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId }); + return; + } + + const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url }); + if (subscriberCount) { + this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount }); + return; + } + + const result = this.statement.topicDeleteById.run({ topicId }); + if (result.changes !== 1) { + throw new DBErrors.UnexpectedResult('did not delete topic'); + } + })(); + this.logger.debug(_scope, 'success', { topicId }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + topicSet(dbCtx, data) { const _scope = _fileScope('topicSet'); this.logger.debug(_scope, 'called', data); diff --git a/src/db/sqlite/sql/subscription-delete-expired.sql b/src/db/sqlite/sql/subscription-delete-expired.sql new file mode 100644 index 0000000..fae9d2c --- /dev/null +++ b/src/db/sqlite/sql/subscription-delete-expired.sql @@ -0,0 +1,3 @@ +-- +DELETE FROM subscription +WHERE topic_id = :topicId AND expires < strftime('%s', 'now') diff --git a/src/db/sqlite/sql/topic-delete-by-id.sql b/src/db/sqlite/sql/topic-delete-by-id.sql new file mode 100644 index 0000000..a66364a --- /dev/null +++ b/src/db/sqlite/sql/topic-delete-by-id.sql @@ -0,0 +1,4 @@ +-- +DELETE FROM topic +WHERE id = :topicId + diff --git a/src/manager.js b/src/manager.js index 0f11068..cae9e74 100644 --- a/src/manager.js +++ b/src/manager.js @@ -626,6 +626,8 @@ class Manager { await this.db.topicDeleted(txCtx, topicId); res.end(); this.logger.info(_scope, 'topic set deleted', { ctx, topicId }); + // Attempt to remove from db if no active subscriptions. + await this.db.topicPendingDelete(txCtx, topicId); return; } diff --git a/test/src/communication.js b/test/src/communication.js index ca5f34e..960a0f9 100644 --- a/test/src/communication.js +++ b/test/src/communication.js @@ -405,6 +405,23 @@ describe('Communication', function () { assert(communication.db.verificationComplete.called); }); + it('unsubscription from deleted topic deletes topic', async function () { + communication.db.verificationGetById.restore(); + verification.mode = 'unsubscribe'; + sinon.stub(communication.db, 'verificationGetById').resolves(verification); + communication.db.topicGetById.restore(); + sinon.stub(communication.db, 'topicGetById').resolves({ + ...topic, + isDeleted: true, + }); + + await communication.verificationProcess(dbCtx, callback, topicId, requestId); + + assert(communication.db.subscriptionDelete.called); + assert(communication.db.verificationComplete.called); + assert(communication.db.topicPendingDelete.called); + }); + it('unsubscription denial succeeds', async function () { communication.db.verificationGetById.restore(); verification.mode = 'unsubscribe'; diff --git a/test/src/db/integration.js b/test/src/db/integration.js index e6f632c..8d5b615 100644 --- a/test/src/db/integration.js +++ b/test/src/db/integration.js @@ -114,6 +114,7 @@ describe('Database Integration', function () { }); // Authentication describe('Topic', function () { + let anotherTopicId; step('requires data', async function () { try { await db.context(async (dbCtx) => { @@ -222,7 +223,7 @@ describe('Database Integration', function () { step('deletes a topic', async function () { await db.context(async (dbCtx) => { const result = await db.topicSet(dbCtx, testData.anotherTopicSet); - const anotherTopicId = result.lastInsertRowid; + anotherTopicId = result.lastInsertRowid; await db.topicDeleted(dbCtx, anotherTopicId); const topic = await db.topicGetById(dbCtx, anotherTopicId); assert.strictEqual(topic.isDeleted, true); @@ -231,7 +232,7 @@ describe('Database Integration', function () { step('update un-deletes a topic', async function () { await db.context(async (dbCtx) => { const result = await db.topicSet(dbCtx, testData.anotherTopicSet); - const anotherTopicId = result.lastInsertRowid; + assert.strictEqual(result.lastInsertRowid, anotherTopicId); const topic = await db.topicGetById(dbCtx, anotherTopicId); assert.strictEqual(topic.isDeleted, false); }); @@ -242,6 +243,15 @@ describe('Database Integration', function () { assert(topics.length); }); }); + // pending delete of deleted topic with no subscriptions + step('really deletes unsubscribed deleted topic', async function() { + await db.context(async (dbCtx) => { + await db.topicDeleted(dbCtx, anotherTopicId); + await db.topicPendingDelete(dbCtx, anotherTopicId); + const topic = await db.topicGetById(dbCtx, anotherTopicId); + assert(!topic); + }); + }); }); // Topic describe('Subscription', function () { @@ -372,6 +382,28 @@ describe('Database Integration', function () { assert(!subscription); }); }); + step('create expired subscription', async function () { + const data = { + ...testData.subscriptionUpsert, + secret: 'newSecret', + topicId, + leaseSeconds: -1, + }; + await db.context(async (dbCtx) => { + const result = await db.subscriptionUpsert(dbCtx, data); + assert(result.lastInsertRowid); + assert.notStrictEqual(result.lastInsertRowid, subscriptionId); + subscriptionId = result.lastInsertRowid; + assert.strictEqual(result.changes, 1); + }); + }); + step('delete expired subscriptions', async function() { + await db.context(async (dbCtx) => { + await db.subscriptionDeleteExpired(dbCtx, topicId) + const subscription = await db.subscriptionGet(dbCtx, testData.subscriptionUpsert.callback, topicId); + assert(!subscription); + }); + }); }); // Subscription describe('Verification', function () { diff --git a/test/src/db/postgres.js b/test/src/db/postgres.js index ef47905..0f037d6 100644 --- a/test/src/db/postgres.js +++ b/test/src/db/postgres.js @@ -557,7 +557,7 @@ describe('DatabasePostgres', function () { changes: 1, lastInsertRowid: undefined, duration: 10, - } + }; sinon.stub(db.db, 'result').resolves(dbResult); const result = await db.subscriptionDelete(dbCtx, callback, topicId); assert.deepStrictEqual(result, expected); @@ -574,6 +574,34 @@ describe('DatabasePostgres', function () { }); }); // subscriptionDelete + describe('subscriptionDeleteExpired', function () { + it('success', async function () { + const dbResult = { + rowCount: 1, + rows: [], + duration: 10, + }; + const expected = { + changes: 1, + lastInsertRowid: undefined, + duration: 10, + }; + sinon.stub(db.db, 'result').resolves(dbResult); + const result = await db.subscriptionDeleteExpired(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + }); + it('failure', async function() { + const expected = new Error(); + sinon.stub(db.db, 'result').rejects(expected); + try { + await db.subscriptionDeleteExpired(dbCtx, topicId); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + }); + describe('subscriptionDeliveryClaim', function () { it('success', async function() { const dbResult = [ @@ -1214,6 +1242,69 @@ describe('DatabasePostgres', function () { }); }); // topicGetContentById + describe('topicPendingDelete', function () { + beforeEach(function () { + sinon.stub(db.db, 'one'); + sinon.stub(db.db, 'result'); + }); + it('success', async function () { + db.db.one.onCall(0).resolves({ + id: topicId, + isDeleted: true, + }).onCall(1).resolves({ + count: 0, + }); + const dbResult = { + rowCount: 1, + rows: [], + duration: 10, + }; + db.db.result.resolves(dbResult); + await db.topicPendingDelete(dbCtx, topicId); + assert(db.db.result.called); + }); + it('does not delete non-deleted topic', async function () { + db.db.one.onCall(0).resolves({ + id: topicId, + isDeleted: false, + }).onCall(1).resolves({ + count: 0, + }); + await db.topicPendingDelete(dbCtx, topicId); + assert(!db.db.result.called); + }); + it('does not delete topic with active subscriptions', async function () { + db.db.one.onCall(0).resolves({ + id: topicId, + isDeleted: true, + }).onCall(1).resolves({ + count: 10, + }); + await db.topicPendingDelete(dbCtx, topicId); + assert(!db.db.result.called); + }); + it('covers no deletion', async function () { + db.db.one.onCall(0).resolves({ + id: topicId, + isDeleted: true, + }).onCall(1).resolves({ + count: 0, + }); + const dbResult = { + rowCount: 0, + rows: [], + duration: 10, + }; + db.db.result.resolves(dbResult); + try { + await db.topicPendingDelete(dbCtx, topicId); + assert.fail(noExpectedException); + } catch (e) { + assert(e instanceof DBErrors.UnexpectedResult); + } + }); + }); + describe('topicSet', function () { let data; beforeEach(function () { diff --git a/test/src/db/sqlite.js b/test/src/db/sqlite.js index 0c96df3..370d559 100644 --- a/test/src/db/sqlite.js +++ b/test/src/db/sqlite.js @@ -404,8 +404,34 @@ describe('DatabaseSQLite', function () { }); }); // subscriptionDelete + describe('subscriptionDeleteExpired', function () { + it('success', async function () { + const dbResult = { + changes: 1, + lastInsertRowid: undefined, + }; + const expected = { + changes: 1, + lastInsertRowid: undefined, + }; + sinon.stub(db.statement.subscriptionDeleteExpired, 'run').returns(dbResult); + const result = await db.subscriptionDeleteExpired(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + }); + it('failure', async function () { + const expected = new Error(); + sinon.stub(db.statement.subscriptionDeleteExpired, 'run').throws(expected); + try { + await db.subscriptionDeleteExpired(dbCtx, topicId); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + }); + describe('subscriptionDeliveryClaim', function () { - it('success', async function() { + it('success', async function () { const dbAllResult = [ { id: 'c2e254c5-aa6e-4a8f-b1a1-e474b07392bb', @@ -1021,6 +1047,76 @@ describe('DatabaseSQLite', function () { }); }); // topicGetContentById + describe('topicPendingDelete', function () { + beforeEach(function () { + sinon.stub(db.statement.topicGetById, 'get'); + sinon.stub(db.statement.subscriptionCountByTopicUrl, 'get'); + sinon.stub(db.statement.topicDeleteById, 'run'); + }); + it('success', async function () { + db.statement.topicGetById.get.returns({ + id: topicId, + isDeleted: true, + }); + db.statement.subscriptionCountByTopicUrl.get.returns({ + count: 0, + }); + db.statement.topicDeleteById.run.returns({ + changes: 1, + }); + db.topicPendingDelete(dbCtx, topicId); + assert(db.statement.topicDeleteById.run.called); + }); + it('does not delete non-deleted topic', async function () { + db.statement.topicGetById.get.returns({ + id: topicId, + isDeleted: false, + }); + db.statement.subscriptionCountByTopicUrl.get.returns({ + count: 0, + }); + db.statement.topicDeleteById.run.returns({ + changes: 1, + }); + db.topicPendingDelete(dbCtx, topicId); + assert(!db.statement.topicDeleteById.run.called); + }); + it('does not delete topic with active subscriptions', async function () { + db.statement.topicGetById.get.returns({ + id: topicId, + isDeleted: true, + }); + db.statement.subscriptionCountByTopicUrl.get.returns({ + count: 10, + }); + db.statement.topicDeleteById.run.returns({ + changes: 1, + }); + db.topicPendingDelete(dbCtx, topicId); + assert(!db.statement.topicDeleteById.run.called); + }); + it('covers no deletion', async function () { + db.statement.topicGetById.get.returns({ + id: topicId, + isDeleted: true, + }); + db.statement.subscriptionCountByTopicUrl.get.returns({ + count: 0, + }); + db.statement.topicDeleteById.run.returns({ + changes: 0, + }); + try { + db.topicPendingDelete(dbCtx, topicId); + assert.fail(noExpectedException); + + } catch (e) { + assert(e instanceof DBErrors.UnexpectedResult); + } + assert(db.statement.topicDeleteById.run.called); + }); + }); + describe('topicSet', function () { let data; beforeEach(function () { diff --git a/test/stub-db.js b/test/stub-db.js index 5ef2422..f257cbf 100644 --- a/test/stub-db.js +++ b/test/stub-db.js @@ -17,6 +17,7 @@ const stubFns = [ 'subscriptionsByTopicId', 'subscriptionCountByTopicUrl', 'subscriptionDelete', + 'subscriptionDeleteExpired', 'subscriptionDeliveryClaim', 'subscriptionDeliveryClaimById', 'subscriptionDeliveryComplete', @@ -36,6 +37,7 @@ const stubFns = [ 'topicGetById', 'topicGetByUrl', 'topicGetContentById', + 'topicPendingDelete', 'topicSet', 'topicSetContent', 'topicUpdate',