From d5e7908d3e60ee0cb3149163d4749563cdfafeb3 Mon Sep 17 00:00:00 2001 From: Justin Wind Date: Wed, 9 Feb 2022 15:57:46 -0800 Subject: [PATCH] db migration 1.0.2, now stores and indexes date of content delivered to subscriber, used in determining deliveries needed --- src/communication.js | 4 +- src/db/postgres/index.js | 24 +++++++---- src/db/postgres/sql/schema/1.0.2/apply.sql | 28 +++++++++++++ src/db/postgres/sql/schema/1.0.2/revert.sql | 21 ++++++++++ .../sql/subscription-delivery-success.sql | 1 + src/db/sqlite/index.js | 30 ++++++++----- src/db/sqlite/sql/schema/1.0.2/apply.sql | 29 +++++++++++++ src/db/sqlite/sql/schema/1.0.2/revert.sql | 21 ++++++++++ .../sql/subscription-delivery-success.sql | 1 + test/src/db/integration.js | 3 +- test/src/db/postgres.js | 27 +++++++++--- test/src/db/sqlite.js | 42 +++++++++++++++++-- 12 files changed, 200 insertions(+), 31 deletions(-) create mode 100644 src/db/postgres/sql/schema/1.0.2/apply.sql create mode 100644 src/db/postgres/sql/schema/1.0.2/revert.sql create mode 100644 src/db/sqlite/sql/schema/1.0.2/apply.sql create mode 100644 src/db/sqlite/sql/schema/1.0.2/revert.sql diff --git a/src/communication.js b/src/communication.js index 03cc670..dc4d464 100644 --- a/src/communication.js +++ b/src/communication.js @@ -565,7 +565,7 @@ class Communication { await this.db.transaction(dbCtx, async (txCtx) => { await this.db.verificationInsert(txCtx, verification); - await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId); + await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated); }); this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData); return; @@ -617,7 +617,7 @@ class Communication { return; } - await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId); + await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated); this.logger.info(_scope, 'update success', logInfoData); } diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index 213fa10..a90e0cb 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -30,7 +30,7 @@ const schemaVersionsSupported = { max: { major: 1, minor: 0, - patch: 1, + patch: 2, }, }; @@ -162,10 +162,16 @@ class DatabasePostgres extends Database { this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted }); for (const v of migrationsWanted) { const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql'); - const migrationSql = _queryFile(fPath); - const results = await this.db.multiResult(migrationSql); - this.logger.debug(_scope, 'executed migration sql', { version: v, results }); - this.logger.info(_scope, 'applied migration', { version: v }); + try { + const migrationSql = _queryFile(fPath); + this.logger.debug(_scope, 'applying migration', { version: v }); + const results = await this.db.multiResult(migrationSql); + this.logger.debug(_scope, 'migration results', { results }); + this.logger.info(_scope, 'applied migration', { version: v }); + } catch (e) { + this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v }); + throw e; + } } } @@ -473,14 +479,14 @@ class DatabasePostgres extends Database { } - async subscriptionDeliveryComplete(dbCtx, callback, topicId) { + async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) { const _scope = _fileScope('subscriptionDeliveryComplete'); - this.logger.debug(_scope, 'called', { callback, topicId }); + this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated }); let result; try { await dbCtx.txIf(async (txCtx) => { - result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId }); + result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated }); if (result.rowCount != 1) { throw new DBErrors.UnexpectedResult('did not set subscription delivery success'); } @@ -490,7 +496,7 @@ class DatabasePostgres extends Database { } }); } catch (e) { - this.logger.error(_scope, 'failed', { error: e, callback, topicId }); + this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated }); throw e; } } diff --git a/src/db/postgres/sql/schema/1.0.2/apply.sql b/src/db/postgres/sql/schema/1.0.2/apply.sql new file mode 100644 index 0000000..6da0924 --- /dev/null +++ b/src/db/postgres/sql/schema/1.0.2/apply.sql @@ -0,0 +1,28 @@ +BEGIN; + -- track when content was delivered as separate from latest content delivered + -- content_delivered continues to be the time the content was delivered, but becomes only informational + -- latest_content_delivered is date on topic content delivered + ALTER TABLE subscription + ADD COLUMN latest_content_delivered TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz + ; + CREATE INDEX subscription_latest_content_delivered_idx ON subscription(latest_content_delivered); + -- migrate existing values + UPDATE subscription SET latest_content_delivered = content_delivered; + -- no need for this index + DROP INDEX subscription_content_delivered_idx; + -- update the view to use latest_cotnent_delivered + CREATE OR REPLACE VIEW subscription_delivery_needed AS + SELECT s.* + FROM subscription s JOIN topic t ON s.topic_id = t.id + WHERE + s.expires > now() + AND + s.latest_content_delivered < t.content_updated + AND + s.delivery_next_attempt < now() + AND + s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active) + ; + + INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 2); +COMMIT; diff --git a/src/db/postgres/sql/schema/1.0.2/revert.sql b/src/db/postgres/sql/schema/1.0.2/revert.sql new file mode 100644 index 0000000..2752e6d --- /dev/null +++ b/src/db/postgres/sql/schema/1.0.2/revert.sql @@ -0,0 +1,21 @@ +BEGIN; + DROP INDEX subscription_latest_content_delivered_idx; + CREATE INDEX subscription_content_delivered_idx ON subscription(content_delivered); + DROP VIEW subscription_delivery_needed; + ALTER TABLE subscription + DROP COLUMN latest_content_delivered + ; + CREATE OR REPLACE VIEW subscription_delivery_needed AS + SELECT s.* + FROM subscription s JOIN topic t ON s.topic_id = t.id + WHERE + s.expires > now() + AND + s.content_delivered < t.content_updated + AND + s.delivery_next_attempt < now() + AND + s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active) + ; + DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 2; +COMMIT; diff --git a/src/db/postgres/sql/subscription-delivery-success.sql b/src/db/postgres/sql/subscription-delivery-success.sql index 9ff01e0..b19b376 100644 --- a/src/db/postgres/sql/subscription-delivery-success.sql +++ b/src/db/postgres/sql/subscription-delivery-success.sql @@ -1,6 +1,7 @@ -- UPDATE subscription SET content_delivered = now(), + latest_content_delivered = $(topicContentUpdated), delivery_attempts_since_success = 0, delivery_next_attempt = '-infinity'::timestamptz WHERE diff --git a/src/db/sqlite/index.js b/src/db/sqlite/index.js index 07d6633..3fae300 100644 --- a/src/db/sqlite/index.js +++ b/src/db/sqlite/index.js @@ -20,12 +20,14 @@ const schemaVersionsSupported = { max: { major: 1, minor: 0, - patch: 1, + patch: 2, }, }; // max of signed int64 (2^63 - 1), should be enough const EPOCH_FOREVER = BigInt('9223372036854775807'); +const epochToDate = (epoch) => new Date(Number(epoch) * 1000); +const dateToEpoch = (date) => Math.round(date.getTime() / 1000); class DatabaseSQLite extends Database { constructor(logger, options) { @@ -85,10 +87,17 @@ class DatabaseSQLite extends Database { this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted }); migrationsWanted.forEach((v) => { const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql'); - // eslint-disable-next-line security/detect-non-literal-fs-filename - const fSql = fs.readFileSync(fPath, { encoding: 'utf8' }); - this.logger.info(_scope, 'applying migration', { version: v }); - this.db.exec(fSql); + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + const fSql = fs.readFileSync(fPath, { encoding: 'utf8' }); + this.logger.debug(_scope, 'applying migration', { version: v }); + const results = this.db.exec(fSql); + this.logger.debug(_scope, 'migration results', { results }); + this.logger.info(_scope, 'applied migration', { version: v }); + } catch (e) { + this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v }); + throw e; + } }); } @@ -304,7 +313,6 @@ class DatabaseSQLite extends Database { * @param {Object} data */ static _subscriptionDataToNative(data) { - const epochToDate = (epoch) => new Date(Number(epoch) * 1000); if (data) { ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => { // eslint-disable-next-line security/detect-object-injection @@ -414,14 +422,15 @@ class DatabaseSQLite extends Database { } - subscriptionDeliveryComplete(dbCtx, callback, topicId) { + subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) { const _scope = _fileScope('subscriptionDeliveryComplete'); - this.logger.debug(_scope, 'called', { callback, topicId }); + this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated }); let result; try { this.db.transaction(() => { - result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId }); + topicContentUpdated = dateToEpoch(topicContentUpdated); + result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId, topicContentUpdated }); if (result.changes != 1) { throw new DBErrors.UnexpectedResult('did not set subscription delivery success'); } @@ -432,7 +441,7 @@ class DatabaseSQLite extends Database { })(); return this._engineInfo(result); } catch (e) { - this.logger.error(_scope, 'failed', { error: e, callback, topicId }); + this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated }); throw e; } } @@ -698,7 +707,6 @@ class DatabaseSQLite extends Database { * @param {Object} data */ static _topicDataToNative(data) { - const epochToDate = (epoch) => new Date(Number(epoch) * 1000); if (data) { data.isActive = !!data.isActive; data.isDeleted = !!data.isDeleted; diff --git a/src/db/sqlite/sql/schema/1.0.2/apply.sql b/src/db/sqlite/sql/schema/1.0.2/apply.sql new file mode 100644 index 0000000..3a7598e --- /dev/null +++ b/src/db/sqlite/sql/schema/1.0.2/apply.sql @@ -0,0 +1,29 @@ +BEGIN; + -- track when content was delivered as separate from latest content delivered + -- content_delivered continues to be the time the content was delivered, but becomes only informational + -- latest_content_delivered is date on topic content delivered + ALTER TABLE subscription + ADD COLUMN latest_content_delivered INTEGER NOT NULL DEFAULT 0 + ; + CREATE INDEX subscription_latest_content_delivered_idx ON subscription(latest_content_delivered); + -- migrate existing values + UPDATE subscription SET latest_content_delivered = content_delivered; + -- no need for this index + DROP INDEX subscription_content_delivered_idx; + -- update the view to use latest_cotnent_delivered + DROP VIEW subscription_delivery_needed; + CREATE VIEW subscription_delivery_needed AS + SELECT s.* + FROM subscription s JOIN topic t ON s.topic_id = t.id + WHERE + s.expires > strftime('%s', 'now') + AND + s.latest_content_delivered < t.content_updated + AND + s.delivery_next_attempt < strftime('%s', 'now') + AND + s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active) + ; + + INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 2); +COMMIT; diff --git a/src/db/sqlite/sql/schema/1.0.2/revert.sql b/src/db/sqlite/sql/schema/1.0.2/revert.sql new file mode 100644 index 0000000..45d3ef1 --- /dev/null +++ b/src/db/sqlite/sql/schema/1.0.2/revert.sql @@ -0,0 +1,21 @@ +BEGIN; + DROP INDEX subscription_latest_content_delivered_idx; + DROP VIEW subscription_delivery_needed; + ALTER TABLE subscription + DROP COLUMN latest_content_delivered + ; + CREATE INDEX subscription_content_delivered_idx ON subscription(content_delivered); + CREATE VIEW subscription_delivery_needed AS + SELECT s.* + FROM subscription s JOIN topic t ON s.topic_id = t.id + WHERE + s.expires > strftime('%s', 'now') + AND + s.content_delivered < t.content_updated + AND + s.delivery_next_attempt < strftime('%s', 'now') + AND + s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active) + ; + DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 2; +COMMIT; diff --git a/src/db/sqlite/sql/subscription-delivery-success.sql b/src/db/sqlite/sql/subscription-delivery-success.sql index 02b89e8..dba2632 100644 --- a/src/db/sqlite/sql/subscription-delivery-success.sql +++ b/src/db/sqlite/sql/subscription-delivery-success.sql @@ -1,6 +1,7 @@ -- UPDATE subscription SET content_delivered = strftime('%s', 'now'), + latest_content_delivered = :topicContentUpdated, delivery_attempts_since_success = 0, delivery_next_attempt = 0 WHERE diff --git a/test/src/db/integration.js b/test/src/db/integration.js index 8d5b615..83feac7 100644 --- a/test/src/db/integration.js +++ b/test/src/db/integration.js @@ -328,7 +328,8 @@ describe('Database Integration', function () { step('complete subscription', async function () { const { callback } = testData.subscriptionUpsert; await db.context(async (dbCtx) => { - await db.subscriptionDeliveryComplete(dbCtx, callback, topicId); + const topic = await db.topicGetById(dbCtx, topicId); + await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topic.contentUpdated); const subscription = await db.subscriptionGetById(dbCtx, subscriptionId); assert.strictEqual(Number(subscription.deliveryAttemptsSinceSuccess), 0); }); diff --git a/test/src/db/postgres.js b/test/src/db/postgres.js index 0f037d6..4a0ecd9 100644 --- a/test/src/db/postgres.js +++ b/test/src/db/postgres.js @@ -227,11 +227,24 @@ describe('DatabasePostgres', function () { }); it('covers migration', async function() { sinon.stub(db.db, 'oneOrNone').resolves({}); - sinon.stub(db.db, 'multiResult'); - sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.max); + sinon.stub(db.db, 'multiResult').resolves({}); + sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.min); sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max); await db.initialize(); }); + it('covers migration failure', async function() { + const expected = new Error('oh no'); + sinon.stub(db.db, 'oneOrNone').resolves({}); + sinon.stub(db.db, 'multiResult').rejects(expected); + sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.min); + sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max); + try { + await db.initialize(); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); it('covers listener', async function() { db.listener = { start: sinon.stub(), @@ -659,12 +672,16 @@ describe('DatabasePostgres', function () { }); // subscriptionDeliveryClaimById describe('subscriptionDeliveryComplete', function () { + let topicContentUpdated; + before(function () { + topicContentUpdated = new Date(); + }); it('success', async function() { const dbResult = { rowCount: 1, }; sinon.stub(db.db, 'result').resolves(dbResult); - await db.subscriptionDeliveryComplete(dbCtx, callback, topicId); + await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated); }); it('failure', async function () { const dbResult = { @@ -672,7 +689,7 @@ describe('DatabasePostgres', function () { }; sinon.stub(db.db, 'result').onCall(0).resolves(dbResult); try { - await db.subscriptionDeliveryComplete(dbCtx, callback, topicId); + await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated); assert.fail(noExpectedException); } catch (e) { assert(e instanceof DBErrors.UnexpectedResult); @@ -687,7 +704,7 @@ describe('DatabasePostgres', function () { }; sinon.stub(db.db, 'result').onCall(0).resolves(dbResult0).onCall(1).resolves(dbResult1); try { - await db.subscriptionDeliveryComplete(dbCtx, callback, topicId); + await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated); assert.fail(noExpectedException); } catch (e) { assert(e instanceof DBErrors.UnexpectedResult); diff --git a/test/src/db/sqlite.js b/test/src/db/sqlite.js index 2af1262..bd66120 100644 --- a/test/src/db/sqlite.js +++ b/test/src/db/sqlite.js @@ -66,6 +66,38 @@ describe('DatabaseSQLite', function () { }); }); // Implementation + describe('_initTables', function () { + let preparedGet; + beforeEach(function () { + preparedGet = sinon.stub(); + sinon.stub(db.db, 'prepare').returns({ + pluck: () => ({ + bind: () => ({ + get: preparedGet, + }), + }), + }); + sinon.stub(db, '_currentSchema').returns(db.schemaVersionsSupported.min); + sinon.stub(db.db, 'exec'); + }); + it('covers migration', async function() { + preparedGet.returns({}); + await db._initTables(); + assert(db.db.exec.called); + }); + it('covers migration failure', async function() { + const expected = new Error('oh no'); + preparedGet.returns({}); + db.db.exec.throws(expected); + try { + await db._initTables(); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + }); // _initTables + describe('_currentSchema', function () { it('covers', async function () { const version = { major: 1, minor: 0, patch: 0 }; @@ -494,13 +526,17 @@ describe('DatabaseSQLite', function () { }); // subscriptionDeliveryClaimById describe('subscriptionDeliveryComplete', function () { + let topicContentUpdated; + before(function () { + topicContentUpdated = new Date(); + }); it('success', async function() { const dbResult = { changes: 1, }; sinon.stub(db.statement.subscriptionDeliverySuccess, 'run').returns(dbResult); sinon.stub(db.statement.subscriptionDeliveryDone, 'run').returns(dbResult); - await db.subscriptionDeliveryComplete(dbCtx, callback, topicId); + await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated); }); it('failure', async function () { const dbResult = { @@ -509,7 +545,7 @@ describe('DatabaseSQLite', function () { sinon.stub(db.statement.subscriptionDeliverySuccess, 'run').returns(dbResult); sinon.stub(db.statement.subscriptionDeliveryDone, 'run').returns(dbResult); try { - await db.subscriptionDeliveryComplete(dbCtx, callback, topicId); + await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated); assert.fail(noExpectedException); } catch (e) { assert(e instanceof DBErrors.UnexpectedResult); @@ -525,7 +561,7 @@ describe('DatabaseSQLite', function () { sinon.stub(db.statement.subscriptionDeliverySuccess, 'run').returns(dbResult0); sinon.stub(db.statement.subscriptionDeliveryDone, 'run').returns(dbResult1); try { - await db.subscriptionDeliveryComplete(dbCtx, callback, topicId); + await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated); assert.fail(noExpectedException); } catch (e) { assert(e instanceof DBErrors.UnexpectedResult); -- 2.43.2