db migration 1.0.2, now stores and indexes date of content delivered to subscriber...
authorJustin Wind <justin.wind+git@gmail.com>
Wed, 9 Feb 2022 23:57:46 +0000 (15:57 -0800)
committerJustin Wind <justin.wind+git@gmail.com>
Thu, 10 Feb 2022 00:52:33 +0000 (16:52 -0800)
12 files changed:
src/communication.js
src/db/postgres/index.js
src/db/postgres/sql/schema/1.0.2/apply.sql [new file with mode: 0644]
src/db/postgres/sql/schema/1.0.2/revert.sql [new file with mode: 0644]
src/db/postgres/sql/subscription-delivery-success.sql
src/db/sqlite/index.js
src/db/sqlite/sql/schema/1.0.2/apply.sql [new file with mode: 0644]
src/db/sqlite/sql/schema/1.0.2/revert.sql [new file with mode: 0644]
src/db/sqlite/sql/subscription-delivery-success.sql
test/src/db/integration.js
test/src/db/postgres.js
test/src/db/sqlite.js

index 03cc670dd7154ccc64a50b87f4d7bb1d5eddafb3..dc4d464c70111b4ea2a8971575940c099d14b379 100644 (file)
@@ -565,7 +565,7 @@ class Communication {
 
       await this.db.transaction(dbCtx, async (txCtx) => {
         await this.db.verificationInsert(txCtx, verification);
-        await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId);
+        await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
       });
       this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
       return;
@@ -617,7 +617,7 @@ class Communication {
         return;
     }
 
-    await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId);
+    await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
     this.logger.info(_scope, 'update success', logInfoData);
   }
 
index 213fa1031c3c44fd29c48a46a8af86ff8bed6217..a90e0cbb4277c02ae2a35d16ef68d9e3434ca6ed 100644 (file)
@@ -30,7 +30,7 @@ const schemaVersionsSupported = {
   max: {
     major: 1,
     minor: 0,
-    patch: 1,
+    patch: 2,
   },
 };
 
@@ -162,10 +162,16 @@ class DatabasePostgres extends Database {
     this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
     for (const v of migrationsWanted) {
       const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
-      const migrationSql = _queryFile(fPath);
-      const results = await this.db.multiResult(migrationSql);
-      this.logger.debug(_scope, 'executed migration sql', { version: v, results });
-      this.logger.info(_scope, 'applied migration', { version: v });
+      try {
+        const migrationSql = _queryFile(fPath);
+        this.logger.debug(_scope, 'applying migration', { version: v });
+        const results = await this.db.multiResult(migrationSql);
+        this.logger.debug(_scope, 'migration results', { results });
+        this.logger.info(_scope, 'applied migration', { version: v });
+      } catch (e) {
+        this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+        throw e;
+      }
     }
   }
 
@@ -473,14 +479,14 @@ class DatabasePostgres extends Database {
   }
 
 
-  async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+  async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
     const _scope = _fileScope('subscriptionDeliveryComplete');
-    this.logger.debug(_scope, 'called', { callback, topicId });
+    this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
 
     let result;
     try {
       await dbCtx.txIf(async (txCtx) => {
-        result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
+        result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
         if (result.rowCount != 1) {
           throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
         }
@@ -490,7 +496,7 @@ class DatabasePostgres extends Database {
         }
       });
     } catch (e) {
-      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
       throw e;
     }
   }
diff --git a/src/db/postgres/sql/schema/1.0.2/apply.sql b/src/db/postgres/sql/schema/1.0.2/apply.sql
new file mode 100644 (file)
index 0000000..6da0924
--- /dev/null
@@ -0,0 +1,28 @@
+BEGIN;
+       -- track when content was delivered as separate from latest content delivered
+       -- content_delivered continues to be the time the content was delivered, but becomes only informational
+       -- latest_content_delivered is date on topic content delivered
+       ALTER TABLE subscription
+               ADD COLUMN latest_content_delivered TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz
+       ;
+       CREATE INDEX subscription_latest_content_delivered_idx ON subscription(latest_content_delivered);
+       -- migrate existing values
+       UPDATE subscription SET latest_content_delivered = content_delivered;
+       -- no need for this index
+       DROP INDEX subscription_content_delivered_idx;
+       -- update the view to use latest_cotnent_delivered
+       CREATE OR REPLACE VIEW subscription_delivery_needed AS
+               SELECT s.*
+               FROM subscription s JOIN topic t ON s.topic_id = t.id
+               WHERE
+                       s.expires > now()
+               AND
+                       s.latest_content_delivered < t.content_updated
+               AND
+                       s.delivery_next_attempt < now()
+               AND
+                       s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active)
+       ;
+
+       INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 2);
+COMMIT;
diff --git a/src/db/postgres/sql/schema/1.0.2/revert.sql b/src/db/postgres/sql/schema/1.0.2/revert.sql
new file mode 100644 (file)
index 0000000..2752e6d
--- /dev/null
@@ -0,0 +1,21 @@
+BEGIN;
+       DROP INDEX subscription_latest_content_delivered_idx;
+       CREATE INDEX subscription_content_delivered_idx ON subscription(content_delivered);
+       DROP VIEW subscription_delivery_needed;
+       ALTER TABLE subscription
+               DROP COLUMN latest_content_delivered
+       ;
+       CREATE OR REPLACE VIEW subscription_delivery_needed AS
+               SELECT s.*
+               FROM subscription s JOIN topic t ON s.topic_id = t.id
+               WHERE
+                       s.expires > now()
+               AND
+                       s.content_delivered < t.content_updated
+               AND
+                       s.delivery_next_attempt < now()
+               AND
+                       s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active)
+       ;
+       DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 2;
+COMMIT;
index 9ff01e0395ef7c689b3ae6d1af33d586d77b683a..b19b376593f54405e2a6ef7532bcef912d0f1ebe 100644 (file)
@@ -1,6 +1,7 @@
 --
 UPDATE subscription SET
        content_delivered = now(),
+       latest_content_delivered = $(topicContentUpdated),
        delivery_attempts_since_success = 0,
        delivery_next_attempt = '-infinity'::timestamptz
 WHERE
index 07d6633311113dcdaf2eae05dabcc912f94fdece..3fae300eec6b1d02a746f32aae1b3d978bf35fda 100644 (file)
@@ -20,12 +20,14 @@ const schemaVersionsSupported = {
   max: {
     major: 1,
     minor: 0,
-    patch: 1,
+    patch: 2,
   },
 };
 
 // max of signed int64 (2^63 - 1), should be enough
 const EPOCH_FOREVER = BigInt('9223372036854775807');
+const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
+const dateToEpoch = (date) => Math.round(date.getTime() / 1000);
 
 class DatabaseSQLite extends Database {
   constructor(logger, options) {
@@ -85,10 +87,17 @@ class DatabaseSQLite extends Database {
     this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
     migrationsWanted.forEach((v) => {
       const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
-      // eslint-disable-next-line security/detect-non-literal-fs-filename
-      const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
-      this.logger.info(_scope, 'applying migration', { version: v });
-      this.db.exec(fSql);
+      try {
+        // eslint-disable-next-line security/detect-non-literal-fs-filename
+        const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
+        this.logger.debug(_scope, 'applying migration', { version: v });
+        const results = this.db.exec(fSql);
+        this.logger.debug(_scope, 'migration results', { results });
+        this.logger.info(_scope, 'applied migration', { version: v });
+      } catch (e) {
+        this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+        throw e;
+      }
     });
   }
 
@@ -304,7 +313,6 @@ class DatabaseSQLite extends Database {
    * @param {Object} data
    */
   static _subscriptionDataToNative(data) {
-    const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
     if (data) {
       ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
         // eslint-disable-next-line security/detect-object-injection
@@ -414,14 +422,15 @@ class DatabaseSQLite extends Database {
   }
 
 
-  subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+  subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
     const _scope = _fileScope('subscriptionDeliveryComplete');
-    this.logger.debug(_scope, 'called', { callback, topicId });
+    this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
 
     let result;
     try {
       this.db.transaction(() => {
-        result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId });
+        topicContentUpdated = dateToEpoch(topicContentUpdated);
+        result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId, topicContentUpdated });
         if (result.changes != 1) {
           throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
         }
@@ -432,7 +441,7 @@ class DatabaseSQLite extends Database {
       })();
       return this._engineInfo(result);
     } catch (e) {
-      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
       throw e;
     }
   }
@@ -698,7 +707,6 @@ class DatabaseSQLite extends Database {
    * @param {Object} data
    */
   static _topicDataToNative(data) {
-    const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
     if (data) {
       data.isActive = !!data.isActive;
       data.isDeleted = !!data.isDeleted;
diff --git a/src/db/sqlite/sql/schema/1.0.2/apply.sql b/src/db/sqlite/sql/schema/1.0.2/apply.sql
new file mode 100644 (file)
index 0000000..3a7598e
--- /dev/null
@@ -0,0 +1,29 @@
+BEGIN;
+       -- track when content was delivered as separate from latest content delivered
+       -- content_delivered continues to be the time the content was delivered, but becomes only informational
+       -- latest_content_delivered is date on topic content delivered
+       ALTER TABLE subscription
+               ADD COLUMN latest_content_delivered INTEGER NOT NULL DEFAULT 0
+       ;
+       CREATE INDEX subscription_latest_content_delivered_idx ON subscription(latest_content_delivered);
+       -- migrate existing values
+       UPDATE subscription SET latest_content_delivered = content_delivered;
+       -- no need for this index
+       DROP INDEX subscription_content_delivered_idx;
+       -- update the view to use latest_cotnent_delivered
+       DROP VIEW subscription_delivery_needed;
+       CREATE VIEW subscription_delivery_needed AS
+               SELECT s.*
+               FROM subscription s JOIN topic t ON s.topic_id = t.id
+               WHERE
+                       s.expires > strftime('%s', 'now')
+               AND
+                       s.latest_content_delivered < t.content_updated
+               AND
+                       s.delivery_next_attempt < strftime('%s', 'now')
+               AND
+                       s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active)
+       ;
+
+       INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 2);
+COMMIT;
diff --git a/src/db/sqlite/sql/schema/1.0.2/revert.sql b/src/db/sqlite/sql/schema/1.0.2/revert.sql
new file mode 100644 (file)
index 0000000..45d3ef1
--- /dev/null
@@ -0,0 +1,21 @@
+BEGIN;
+       DROP INDEX subscription_latest_content_delivered_idx;
+       DROP VIEW subscription_delivery_needed;
+       ALTER TABLE subscription
+               DROP COLUMN latest_content_delivered
+       ;
+       CREATE INDEX subscription_content_delivered_idx ON subscription(content_delivered);
+       CREATE VIEW subscription_delivery_needed AS
+               SELECT s.*
+               FROM subscription s JOIN topic t ON s.topic_id = t.id
+               WHERE
+                       s.expires > strftime('%s', 'now')
+               AND
+                       s.content_delivered < t.content_updated
+               AND
+                       s.delivery_next_attempt < strftime('%s', 'now')
+               AND
+                       s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active)
+       ;
+       DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 2;
+COMMIT;
index 02b89e87e65328d25d1d2d83d9b743e36e4f1ebc..dba26322081e39a3c6e3d5035efe0cf5e2191f85 100644 (file)
@@ -1,6 +1,7 @@
 --
 UPDATE subscription SET
        content_delivered = strftime('%s', 'now'),
+       latest_content_delivered = :topicContentUpdated,
        delivery_attempts_since_success = 0,
        delivery_next_attempt = 0
 WHERE
index 8d5b61553d24f3b72e4ec6d39fd73534de7617cd..83feac761222a620793cb6963b9a2604e3fb60f5 100644 (file)
@@ -328,7 +328,8 @@ describe('Database Integration', function () {
         step('complete subscription', async function () {
           const { callback } = testData.subscriptionUpsert;
           await db.context(async (dbCtx) => {
-            await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+            const topic = await db.topicGetById(dbCtx, topicId);
+            await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topic.contentUpdated);
             const subscription = await db.subscriptionGetById(dbCtx, subscriptionId);
             assert.strictEqual(Number(subscription.deliveryAttemptsSinceSuccess), 0);
           });
index 0f037d6fae91b6dd4eed3098626317c9797f9d79..4a0ecd98516372ecc1adada38595c1ce4c63fa61 100644 (file)
@@ -227,11 +227,24 @@ describe('DatabasePostgres', function () {
     });
     it('covers migration', async function() {
       sinon.stub(db.db, 'oneOrNone').resolves({});
-      sinon.stub(db.db, 'multiResult');
-      sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.max);
+      sinon.stub(db.db, 'multiResult').resolves({});
+      sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.min);
       sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max);
       await db.initialize();
     });
+    it('covers migration failure', async function() {
+      const expected = new Error('oh no');
+      sinon.stub(db.db, 'oneOrNone').resolves({});
+      sinon.stub(db.db, 'multiResult').rejects(expected);
+      sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.min);
+      sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max);
+      try {
+        await db.initialize();
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
     it('covers listener', async function() {
       db.listener = {
         start: sinon.stub(),
@@ -659,12 +672,16 @@ describe('DatabasePostgres', function () {
   }); // subscriptionDeliveryClaimById
 
   describe('subscriptionDeliveryComplete', function () {
+    let topicContentUpdated;
+    before(function () {
+      topicContentUpdated = new Date();
+    });
     it('success', async function() {
       const dbResult = {
         rowCount: 1,
       };
       sinon.stub(db.db, 'result').resolves(dbResult);
-      await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+      await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
     });
     it('failure', async function () {
       const dbResult = {
@@ -672,7 +689,7 @@ describe('DatabasePostgres', function () {
       };
       sinon.stub(db.db, 'result').onCall(0).resolves(dbResult);
       try {
-        await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+        await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
         assert.fail(noExpectedException);
       } catch (e) {
         assert(e instanceof DBErrors.UnexpectedResult);
@@ -687,7 +704,7 @@ describe('DatabasePostgres', function () {
       };
       sinon.stub(db.db, 'result').onCall(0).resolves(dbResult0).onCall(1).resolves(dbResult1);
       try {
-        await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+        await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
         assert.fail(noExpectedException);
       } catch (e) {
         assert(e instanceof DBErrors.UnexpectedResult);
index 2af1262724a5269165ae07e18e5209b85e94a343..bd6612030b3cae08e5d1e1858e8234644033d598 100644 (file)
@@ -66,6 +66,38 @@ describe('DatabaseSQLite', function () {
     });
   }); // Implementation
 
+  describe('_initTables', function () {
+    let preparedGet;
+    beforeEach(function () {
+      preparedGet = sinon.stub();
+      sinon.stub(db.db, 'prepare').returns({
+        pluck: () => ({
+          bind: () => ({
+            get: preparedGet,
+          }),
+        }),
+      });
+      sinon.stub(db, '_currentSchema').returns(db.schemaVersionsSupported.min);
+      sinon.stub(db.db, 'exec');
+    });
+    it('covers migration', async function() {
+      preparedGet.returns({});
+      await db._initTables();
+      assert(db.db.exec.called);
+    });
+    it('covers migration failure', async function() {
+      const expected = new Error('oh no');
+      preparedGet.returns({});
+      db.db.exec.throws(expected);
+      try {
+        await db._initTables();
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
+  }); // _initTables
+
   describe('_currentSchema', function () {
     it('covers', async function () {
       const version = { major: 1, minor: 0, patch: 0 };
@@ -494,13 +526,17 @@ describe('DatabaseSQLite', function () {
   }); // subscriptionDeliveryClaimById
 
   describe('subscriptionDeliveryComplete', function () {
+    let topicContentUpdated;
+    before(function () {
+      topicContentUpdated = new Date();
+    });
     it('success', async function() {
       const dbResult = {
         changes: 1,
       };
       sinon.stub(db.statement.subscriptionDeliverySuccess, 'run').returns(dbResult);
       sinon.stub(db.statement.subscriptionDeliveryDone, 'run').returns(dbResult);
-      await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+      await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
     });
     it('failure', async function () {
       const dbResult = {
@@ -509,7 +545,7 @@ describe('DatabaseSQLite', function () {
       sinon.stub(db.statement.subscriptionDeliverySuccess, 'run').returns(dbResult);
       sinon.stub(db.statement.subscriptionDeliveryDone, 'run').returns(dbResult);
       try {
-        await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+        await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
         assert.fail(noExpectedException);
       } catch (e) {
         assert(e instanceof DBErrors.UnexpectedResult);
@@ -525,7 +561,7 @@ describe('DatabaseSQLite', function () {
       sinon.stub(db.statement.subscriptionDeliverySuccess, 'run').returns(dbResult0);
       sinon.stub(db.statement.subscriptionDeliveryDone, 'run').returns(dbResult1);
       try {
-        await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+        await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
         assert.fail(noExpectedException);
       } catch (e) {
         assert(e instanceof DBErrors.UnexpectedResult);