expired subscriptions and deleted topics with no subscribers are now removed from...
authorJustin Wind <justin.wind+git@gmail.com>
Sat, 21 Aug 2021 21:51:47 +0000 (14:51 -0700)
committerJustin Wind <justin.wind+git@gmail.com>
Mon, 23 Aug 2021 22:36:19 +0000 (15:36 -0700)
Expired subscriptions are removed when a topic is updated, and topics
set to deleted state are removed when the last subscriber is notified.

15 files changed:
CHANGELOG.md
src/communication.js
src/db/base.js
src/db/postgres/index.js
src/db/postgres/sql/subscription-delete-expired.sql [new file with mode: 0644]
src/db/postgres/sql/topic-delete-by-id.sql [new file with mode: 0644]
src/db/sqlite/index.js
src/db/sqlite/sql/subscription-delete-expired.sql [new file with mode: 0644]
src/db/sqlite/sql/topic-delete-by-id.sql [new file with mode: 0644]
src/manager.js
test/src/communication.js
test/src/db/integration.js
test/src/db/postgres.js
test/src/db/sqlite.js
test/stub-db.js

index a8521deb25012489398fc126cfca748525f6254d..c14c493d1e098eedde9dcc90dca6a4d3b581acef 100644 (file)
@@ -7,6 +7,8 @@ Releases and notable changes to this project are documented here.
 ### Added
 
 - Accept multiple topics in publish requests.
+- Expired subscription entries are removed from the database when their topics are updated.
+- Topics which have been marked deleted are removed from the database after all subscribers have been notified.
 
 ## [v1.1.5] - 2021-08-23
 
index 345714f328951f4f7964249a8884882390bb4abc..097da637bd418d4d655f04aa65651ad5df709e5d 100644 (file)
@@ -223,6 +223,7 @@ class Communication {
     }
 
     if (!topic.isActive) {
+      // These should be filtered out when selecting verification tasks to process.
       this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
       await this.db.verificationRelease(dbCtx, verificationId);
       return;
@@ -328,11 +329,19 @@ class Communication {
         case Enum.Mode.Unsubscribe:
           if (verificationAccepted) {
             await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+            if (topic.isDeleted) {
+              // Remove a deleted topic after the last subscription is notified.
+              await this.db.topicPendingDelete(txCtx, topic.id);
+            }
           }
           break;
 
         case Enum.Mode.Denied:
           await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+          if (topic.isDeleted) {
+            // Remove a deleted topic after he last subscription is notified.
+            await this.db.topicPendingDelete(txCtx, topic.id);
+          }
           break;
 
         default:
@@ -431,6 +440,9 @@ class Communication {
       throw new Errors.InternalInconsistencyError('no such topic id');
     }
 
+    // Cull any expired subscriptions
+    await this.db.subscriptionDeleteExpired(dbCtx, topicId);
+
     logInfoData.url = topicId.url;
 
     if (topic.isDeleted) {
@@ -479,13 +491,15 @@ class Communication {
 
     const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
     if (!validHub) {
-      this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData });
+      this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
       if (this.options.communication.strictTopicHubLink) {
         await this.db.transaction(dbCtx, async (txCtx) => {
           // Set as deleted and set content_updated so subscriptions are notified.
           await this.db.topicDeleted(txCtx, topicId);
           await this.db.topicFetchComplete(txCtx, topicId);
         });
+        // Attempt to remove from db, if no active subscriptions.
+        await this.db.topicPendingDelete(dbCtx, topicId);
         return;
       }
     }
index 95c901068092eb93ea8218cca55e775b1ffa5f10..de0cd44a1b9c5a77eb0e8fcb22771ffbf7c488c1 100644 (file)
@@ -351,6 +351,16 @@ class Database {
   }
 
 
+  /**
+   * Remove any expired subscriptions to a topic.
+   * @param {*} dbCtx
+   * @param {*} topicId
+   */
+  async subscriptionDeleteExpired(dbCtx, topicId) {
+    this._notImplemented('subscriptionDeleteExpired', arguments);
+  }
+
+
   /**
    * Claim subscriptions needing content updates attempted.
    * @param {*} dbCtx 
@@ -533,6 +543,7 @@ class Database {
     this._notImplemented('topicGetAll', arguments);
   }
 
+
   /**
    * Get topic data, without content.
    * @param {*} dbCtx 
@@ -563,14 +574,15 @@ class Database {
     this._notImplemented('topicGetContentById', arguments);
   }
 
-  // /**
-  //  * Call after an unsubscribe, to check if a topic is awaiting deletion, and that
-  //  * was the last subscription belaying it.
-  //  * @param {String|Integer} data topic url or id
-  //  */
-  // async topicPendingDelete(dbCtx, data) {
-  //   this._notImplemented('topicPendingDelete', arguments);
-  // }
+
+  /**
+   * Attempt to delete a topic, which must be set isDeleted, if there
+   * are no more subscriptions belaying its removal.
+   * @param {*} topicId
+   */
+  async topicPendingDelete(dbCtx, topicId) {
+    this._notImplemented('topicPendingDelete', arguments);
+  }
 
 
   /**
index 255909780fe71c41133fc4b29c30eb5ec563f27a..213fa1031c3c44fd29c48a46a8af86ff8bed6217 100644 (file)
@@ -421,6 +421,21 @@ class DatabasePostgres extends Database {
   }
 
 
+  async subscriptionDeleteExpired(dbCtx, topicId) {
+    const _scope = _fileScope('subscriptionDeleteExpired');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
+      this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
     const _scope = _fileScope('subscriptionDeliveryClaim');
     this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
@@ -804,6 +819,37 @@ class DatabasePostgres extends Database {
   }
 
 
+  async topicPendingDelete(dbCtx, topicId) {
+    const _scope = _fileScope('topicPendingDelete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        const topic = await txCtx.one(this.statement.topicGetById, { topicId });
+        if (!topic.isDeleted) {
+          this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+          return;
+        }
+
+        const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
+        if (subscriberCount) {
+          this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+          return;
+        }
+
+        const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
+        if (result.rowCount !== 1) {
+          throw new DBErrors.UnexpectedResult('did not delete topic');
+        }
+      });
+      this.logger.debug(_scope, 'success', { topicId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   async topicSet(dbCtx, data) {
     const _scope = _fileScope('topicSet');
     this.logger.debug(_scope, 'called', data);
diff --git a/src/db/postgres/sql/subscription-delete-expired.sql b/src/db/postgres/sql/subscription-delete-expired.sql
new file mode 100644 (file)
index 0000000..d0f96e7
--- /dev/null
@@ -0,0 +1,4 @@
+--
+DELETE FROM subscription
+WHERE topic_id = $(topicId) AND expires < now()
+
diff --git a/src/db/postgres/sql/topic-delete-by-id.sql b/src/db/postgres/sql/topic-delete-by-id.sql
new file mode 100644 (file)
index 0000000..1378016
--- /dev/null
@@ -0,0 +1,4 @@
+--
+DELETE FROM topic
+WHERE id = $(topicId)
+
index fba4e7ca326f828a6f9698258d2ec4628fb7665e..07d6633311113dcdaf2eae05dabcc912f94fdece 100644 (file)
@@ -359,6 +359,21 @@ class DatabaseSQLite extends Database {
   }
 
 
+  subscriptionDeleteExpired(dbCtx, topicId) {
+    const _scope = _fileScope('subscriptionDeleteExpired');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      const result = this.statement.subscriptionDeleteExpired.run({ topicId });
+      this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
     const _scope = _fileScope('subscriptionDeliveryClaim');
     this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
@@ -768,6 +783,37 @@ class DatabaseSQLite extends Database {
   }
 
 
+  topicPendingDelete(dbCtx, topicId) {
+    const _scope = _fileScope('topicPendingDelete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      this.db.transaction(() => {
+        const topic = this.statement.topicGetById.get({ topicId });
+        if (!topic.isDeleted) {
+          this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+          return;
+        }
+
+        const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
+        if (subscriberCount) {
+          this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+          return;
+        }
+
+        const result = this.statement.topicDeleteById.run({ topicId });
+        if (result.changes !== 1) {
+          throw new DBErrors.UnexpectedResult('did not delete topic');
+        }
+      })();
+      this.logger.debug(_scope, 'success', { topicId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   topicSet(dbCtx, data) {
     const _scope = _fileScope('topicSet');
     this.logger.debug(_scope, 'called', data);
diff --git a/src/db/sqlite/sql/subscription-delete-expired.sql b/src/db/sqlite/sql/subscription-delete-expired.sql
new file mode 100644 (file)
index 0000000..fae9d2c
--- /dev/null
@@ -0,0 +1,3 @@
+--
+DELETE FROM subscription
+WHERE topic_id = :topicId AND expires < strftime('%s', 'now')
diff --git a/src/db/sqlite/sql/topic-delete-by-id.sql b/src/db/sqlite/sql/topic-delete-by-id.sql
new file mode 100644 (file)
index 0000000..a66364a
--- /dev/null
@@ -0,0 +1,4 @@
+--
+DELETE FROM topic
+WHERE id = :topicId
+
index 0f110682eb2b3f94ddd1fe0076dab22e620ce69a..cae9e74ce734b6fec771f4c9ceb3927c8e1b9028 100644 (file)
@@ -626,6 +626,8 @@ class Manager {
           await this.db.topicDeleted(txCtx, topicId);
           res.end();
           this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
+          // Attempt to remove from db if no active subscriptions.
+          await this.db.topicPendingDelete(txCtx, topicId);
           return;
         }
 
index ca5f34e92c536458171c18fa8cd4f78976a5bf11..960a0f994b27a572236a92131638b326c59f4ece 100644 (file)
@@ -405,6 +405,23 @@ describe('Communication', function () {
       assert(communication.db.verificationComplete.called);
     });
 
+    it('unsubscription from deleted topic deletes topic', async function () {
+      communication.db.verificationGetById.restore();
+      verification.mode = 'unsubscribe';
+      sinon.stub(communication.db, 'verificationGetById').resolves(verification);
+      communication.db.topicGetById.restore();
+      sinon.stub(communication.db, 'topicGetById').resolves({
+        ...topic,
+        isDeleted: true,
+      });
+
+      await communication.verificationProcess(dbCtx, callback, topicId, requestId);
+
+      assert(communication.db.subscriptionDelete.called);
+      assert(communication.db.verificationComplete.called);
+      assert(communication.db.topicPendingDelete.called);
+    });
+
     it('unsubscription denial succeeds', async function () {
       communication.db.verificationGetById.restore();
       verification.mode = 'unsubscribe';
index e6f632c897280586c404ebe2962fc747e08e7923..8d5b61553d24f3b72e4ec6d39fd73534de7617cd 100644 (file)
@@ -114,6 +114,7 @@ describe('Database Integration', function () {
       }); // Authentication
 
       describe('Topic', function () {
+        let anotherTopicId;
         step('requires data', async function () {
           try {
             await db.context(async (dbCtx) => {
@@ -222,7 +223,7 @@ describe('Database Integration', function () {
         step('deletes a topic', async function () {
           await db.context(async (dbCtx) => {
             const result = await db.topicSet(dbCtx, testData.anotherTopicSet);
-            const anotherTopicId = result.lastInsertRowid;
+            anotherTopicId = result.lastInsertRowid;
             await db.topicDeleted(dbCtx, anotherTopicId);
             const topic = await db.topicGetById(dbCtx, anotherTopicId);
             assert.strictEqual(topic.isDeleted, true);
@@ -231,7 +232,7 @@ describe('Database Integration', function () {
         step('update un-deletes a topic', async function () {
           await db.context(async (dbCtx) => {
             const result = await db.topicSet(dbCtx, testData.anotherTopicSet);
-            const anotherTopicId = result.lastInsertRowid;
+            assert.strictEqual(result.lastInsertRowid, anotherTopicId);
             const topic = await db.topicGetById(dbCtx, anotherTopicId);
             assert.strictEqual(topic.isDeleted, false);
           });
@@ -242,6 +243,15 @@ describe('Database Integration', function () {
             assert(topics.length);
           });
         });
+        // pending delete of deleted topic with no subscriptions
+        step('really deletes unsubscribed deleted topic', async function() {
+          await db.context(async (dbCtx) => {
+            await db.topicDeleted(dbCtx, anotherTopicId);
+            await db.topicPendingDelete(dbCtx, anotherTopicId);
+            const topic = await db.topicGetById(dbCtx, anotherTopicId);
+            assert(!topic);
+          });
+        });
       }); // Topic
 
       describe('Subscription', function () {
@@ -372,6 +382,28 @@ describe('Database Integration', function () {
             assert(!subscription);
           });
         });
+        step('create expired subscription', async function () {
+          const data = {
+            ...testData.subscriptionUpsert,
+            secret: 'newSecret',
+            topicId,
+            leaseSeconds: -1,
+          };
+          await db.context(async (dbCtx) => {
+            const result = await db.subscriptionUpsert(dbCtx, data);
+            assert(result.lastInsertRowid);
+            assert.notStrictEqual(result.lastInsertRowid, subscriptionId);
+            subscriptionId = result.lastInsertRowid;
+            assert.strictEqual(result.changes, 1);
+          });
+        });
+        step('delete expired subscriptions', async function() {
+          await db.context(async (dbCtx) => {
+            await db.subscriptionDeleteExpired(dbCtx, topicId)
+            const subscription = await db.subscriptionGet(dbCtx, testData.subscriptionUpsert.callback, topicId);
+            assert(!subscription);
+          });
+        });
       }); // Subscription
 
       describe('Verification', function () {
index ef4790542cd147a076a0fdc11985da6641f10b64..0f037d6fae91b6dd4eed3098626317c9797f9d79 100644 (file)
@@ -557,7 +557,7 @@ describe('DatabasePostgres', function () {
         changes: 1,
         lastInsertRowid: undefined,
         duration: 10,
-      }
+      };
       sinon.stub(db.db, 'result').resolves(dbResult);
       const result = await db.subscriptionDelete(dbCtx, callback, topicId);
       assert.deepStrictEqual(result, expected);
@@ -574,6 +574,34 @@ describe('DatabasePostgres', function () {
     });
   }); // subscriptionDelete
 
+  describe('subscriptionDeleteExpired', function () {
+    it('success', async function () {
+      const dbResult = {
+        rowCount: 1,
+        rows: [],
+        duration: 10,
+      };
+      const expected = {
+        changes: 1,
+        lastInsertRowid: undefined,
+        duration: 10,
+      };
+      sinon.stub(db.db, 'result').resolves(dbResult);
+      const result = await db.subscriptionDeleteExpired(dbCtx, topicId);
+      assert.deepStrictEqual(result, expected);
+    });
+    it('failure', async function() {
+      const expected = new Error();
+      sinon.stub(db.db, 'result').rejects(expected);
+      try {
+        await db.subscriptionDeleteExpired(dbCtx, topicId);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
+  });
+
   describe('subscriptionDeliveryClaim', function () {
     it('success', async function() {
       const dbResult = [
@@ -1214,6 +1242,69 @@ describe('DatabasePostgres', function () {
     });
   }); // topicGetContentById
 
+  describe('topicPendingDelete', function () {
+    beforeEach(function () {
+      sinon.stub(db.db, 'one');
+      sinon.stub(db.db, 'result');
+    });
+    it('success', async function () {
+      db.db.one.onCall(0).resolves({
+        id: topicId,
+        isDeleted: true,
+      }).onCall(1).resolves({
+        count: 0,
+      });
+      const dbResult = {
+        rowCount: 1,
+        rows: [],
+        duration: 10,
+      };
+      db.db.result.resolves(dbResult);
+      await db.topicPendingDelete(dbCtx, topicId);
+      assert(db.db.result.called);
+    });
+    it('does not delete non-deleted topic', async function () {
+      db.db.one.onCall(0).resolves({
+        id: topicId,
+        isDeleted: false,
+      }).onCall(1).resolves({
+        count: 0,
+      });
+      await db.topicPendingDelete(dbCtx, topicId);
+      assert(!db.db.result.called);
+    });
+    it('does not delete topic with active subscriptions', async function () {
+      db.db.one.onCall(0).resolves({
+        id: topicId,
+        isDeleted: true,
+      }).onCall(1).resolves({
+        count: 10,
+      });
+      await db.topicPendingDelete(dbCtx, topicId);
+      assert(!db.db.result.called);
+    });
+    it('covers no deletion', async function () {
+      db.db.one.onCall(0).resolves({
+        id: topicId,
+        isDeleted: true,
+      }).onCall(1).resolves({
+        count: 0,
+      });
+      const dbResult = {
+        rowCount: 0,
+        rows: [],
+        duration: 10,
+      };
+      db.db.result.resolves(dbResult);
+      try {
+        await db.topicPendingDelete(dbCtx, topicId);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert(e instanceof DBErrors.UnexpectedResult);
+      }
+    });
+  });
+
   describe('topicSet', function () {
     let data;
     beforeEach(function () {
index 0c96df364d1d4d5ecfb98b6de6e2a021a3c92a05..370d559909cfdfaa1bb2eb7020584e3f8e0eb7d1 100644 (file)
@@ -404,8 +404,34 @@ describe('DatabaseSQLite', function () {
     });
   }); // subscriptionDelete
 
+  describe('subscriptionDeleteExpired', function () {
+    it('success', async function () {
+      const dbResult = {
+        changes: 1,
+        lastInsertRowid: undefined,
+      };
+      const expected = {
+        changes: 1,
+        lastInsertRowid: undefined,
+      };
+      sinon.stub(db.statement.subscriptionDeleteExpired, 'run').returns(dbResult);
+      const result = await db.subscriptionDeleteExpired(dbCtx, topicId);
+      assert.deepStrictEqual(result, expected);
+    });
+    it('failure', async function () {
+      const expected = new Error();
+      sinon.stub(db.statement.subscriptionDeleteExpired, 'run').throws(expected);
+      try {
+        await db.subscriptionDeleteExpired(dbCtx, topicId);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
+  });
+
   describe('subscriptionDeliveryClaim', function () {
-    it('success', async function() {
+    it('success', async function () {
       const dbAllResult = [
         {
           id: 'c2e254c5-aa6e-4a8f-b1a1-e474b07392bb',
@@ -1021,6 +1047,76 @@ describe('DatabaseSQLite', function () {
     });
   }); // topicGetContentById
 
+  describe('topicPendingDelete', function () {
+    beforeEach(function () {
+      sinon.stub(db.statement.topicGetById, 'get');
+      sinon.stub(db.statement.subscriptionCountByTopicUrl, 'get');
+      sinon.stub(db.statement.topicDeleteById, 'run');
+    });
+    it('success', async function () {
+      db.statement.topicGetById.get.returns({
+        id: topicId,
+        isDeleted: true,
+      });
+      db.statement.subscriptionCountByTopicUrl.get.returns({
+        count: 0,
+      });
+      db.statement.topicDeleteById.run.returns({
+        changes: 1,
+      });
+      db.topicPendingDelete(dbCtx, topicId);
+      assert(db.statement.topicDeleteById.run.called);
+    });
+    it('does not delete non-deleted topic', async function () {
+      db.statement.topicGetById.get.returns({
+        id: topicId,
+        isDeleted: false,
+      });
+      db.statement.subscriptionCountByTopicUrl.get.returns({
+        count: 0,
+      });
+      db.statement.topicDeleteById.run.returns({
+        changes: 1,
+      });
+      db.topicPendingDelete(dbCtx, topicId);
+      assert(!db.statement.topicDeleteById.run.called);
+    });
+    it('does not delete topic with active subscriptions', async function () {
+      db.statement.topicGetById.get.returns({
+        id: topicId,
+        isDeleted: true,
+      });
+      db.statement.subscriptionCountByTopicUrl.get.returns({
+        count: 10,
+      });
+      db.statement.topicDeleteById.run.returns({
+        changes: 1,
+      });
+      db.topicPendingDelete(dbCtx, topicId);
+      assert(!db.statement.topicDeleteById.run.called);
+    });
+    it('covers no deletion', async function () {
+      db.statement.topicGetById.get.returns({
+        id: topicId,
+        isDeleted: true,
+      });
+      db.statement.subscriptionCountByTopicUrl.get.returns({
+        count: 0,
+      });
+      db.statement.topicDeleteById.run.returns({
+        changes: 0,
+      });
+      try {
+        db.topicPendingDelete(dbCtx, topicId);
+        assert.fail(noExpectedException);
+
+      } catch (e) {
+        assert(e instanceof DBErrors.UnexpectedResult);
+      }
+      assert(db.statement.topicDeleteById.run.called);
+    });
+  });
+
   describe('topicSet', function () {
     let data;
     beforeEach(function () {
index 5ef24228a8b5bf1db00257cf1745572d5c3ef742..f257cbfeeb0728af0b9b1bd135e49e7bb505d4c2 100644 (file)
@@ -17,6 +17,7 @@ const stubFns = [
   'subscriptionsByTopicId',
   'subscriptionCountByTopicUrl',
   'subscriptionDelete',
+  'subscriptionDeleteExpired',
   'subscriptionDeliveryClaim',
   'subscriptionDeliveryClaimById',
   'subscriptionDeliveryComplete',
@@ -36,6 +37,7 @@ const stubFns = [
   'topicGetById',
   'topicGetByUrl',
   'topicGetContentById',
+  'topicPendingDelete',
   'topicSet',
   'topicSetContent',
   'topicUpdate',