await this.db.transaction(dbCtx, async (txCtx) => {
await this.db.verificationInsert(txCtx, verification);
- await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId);
+ await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
});
this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
return;
return;
}
- await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId);
+ await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
this.logger.info(_scope, 'update success', logInfoData);
}
max: {
major: 1,
minor: 0,
- patch: 1,
+ patch: 2,
},
};
this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
for (const v of migrationsWanted) {
const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
- const migrationSql = _queryFile(fPath);
- const results = await this.db.multiResult(migrationSql);
- this.logger.debug(_scope, 'executed migration sql', { version: v, results });
- this.logger.info(_scope, 'applied migration', { version: v });
+ try {
+ const migrationSql = _queryFile(fPath);
+ this.logger.debug(_scope, 'applying migration', { version: v });
+ const results = await this.db.multiResult(migrationSql);
+ this.logger.debug(_scope, 'migration results', { results });
+ this.logger.info(_scope, 'applied migration', { version: v });
+ } catch (e) {
+ this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+ throw e;
+ }
}
}
}
- async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+ async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
const _scope = _fileScope('subscriptionDeliveryComplete');
- this.logger.debug(_scope, 'called', { callback, topicId });
+ this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
let result;
try {
await dbCtx.txIf(async (txCtx) => {
- result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
+ result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
if (result.rowCount != 1) {
throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
}
}
});
} catch (e) {
- this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+ this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
throw e;
}
}
--- /dev/null
+BEGIN;
+ -- track when content was delivered as separate from latest content delivered
+ -- content_delivered continues to be the time the content was delivered, but becomes only informational
+ -- latest_content_delivered is date on topic content delivered
+ ALTER TABLE subscription
+ ADD COLUMN latest_content_delivered TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT '-infinity'::timestamptz
+ ;
+ CREATE INDEX subscription_latest_content_delivered_idx ON subscription(latest_content_delivered);
+ -- migrate existing values
+ UPDATE subscription SET latest_content_delivered = content_delivered;
+ -- no need for this index
+ DROP INDEX subscription_content_delivered_idx;
+ -- update the view to use latest_cotnent_delivered
+ CREATE OR REPLACE VIEW subscription_delivery_needed AS
+ SELECT s.*
+ FROM subscription s JOIN topic t ON s.topic_id = t.id
+ WHERE
+ s.expires > now()
+ AND
+ s.latest_content_delivered < t.content_updated
+ AND
+ s.delivery_next_attempt < now()
+ AND
+ s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active)
+ ;
+
+ INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 2);
+COMMIT;
--- /dev/null
+BEGIN;
+ DROP INDEX subscription_latest_content_delivered_idx;
+ CREATE INDEX subscription_content_delivered_idx ON subscription(content_delivered);
+ DROP VIEW subscription_delivery_needed;
+ ALTER TABLE subscription
+ DROP COLUMN latest_content_delivered
+ ;
+ CREATE OR REPLACE VIEW subscription_delivery_needed AS
+ SELECT s.*
+ FROM subscription s JOIN topic t ON s.topic_id = t.id
+ WHERE
+ s.expires > now()
+ AND
+ s.content_delivered < t.content_updated
+ AND
+ s.delivery_next_attempt < now()
+ AND
+ s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active)
+ ;
+ DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 2;
+COMMIT;
--
UPDATE subscription SET
content_delivered = now(),
+ latest_content_delivered = $(topicContentUpdated),
delivery_attempts_since_success = 0,
delivery_next_attempt = '-infinity'::timestamptz
WHERE
max: {
major: 1,
minor: 0,
- patch: 1,
+ patch: 2,
},
};
// max of signed int64 (2^63 - 1), should be enough
const EPOCH_FOREVER = BigInt('9223372036854775807');
+const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
+const dateToEpoch = (date) => Math.round(date.getTime() / 1000);
class DatabaseSQLite extends Database {
constructor(logger, options) {
this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
migrationsWanted.forEach((v) => {
const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
- // eslint-disable-next-line security/detect-non-literal-fs-filename
- const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
- this.logger.info(_scope, 'applying migration', { version: v });
- this.db.exec(fSql);
+ try {
+ // eslint-disable-next-line security/detect-non-literal-fs-filename
+ const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
+ this.logger.debug(_scope, 'applying migration', { version: v });
+ const results = this.db.exec(fSql);
+ this.logger.debug(_scope, 'migration results', { results });
+ this.logger.info(_scope, 'applied migration', { version: v });
+ } catch (e) {
+ this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+ throw e;
+ }
});
}
* @param {Object} data
*/
static _subscriptionDataToNative(data) {
- const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
if (data) {
['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
// eslint-disable-next-line security/detect-object-injection
}
- subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+ subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
const _scope = _fileScope('subscriptionDeliveryComplete');
- this.logger.debug(_scope, 'called', { callback, topicId });
+ this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
let result;
try {
this.db.transaction(() => {
- result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId });
+ topicContentUpdated = dateToEpoch(topicContentUpdated);
+ result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId, topicContentUpdated });
if (result.changes != 1) {
throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
}
})();
return this._engineInfo(result);
} catch (e) {
- this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+ this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
throw e;
}
}
* @param {Object} data
*/
static _topicDataToNative(data) {
- const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
if (data) {
data.isActive = !!data.isActive;
data.isDeleted = !!data.isDeleted;
--- /dev/null
+BEGIN;
+ -- track when content was delivered as separate from latest content delivered
+ -- content_delivered continues to be the time the content was delivered, but becomes only informational
+ -- latest_content_delivered is date on topic content delivered
+ ALTER TABLE subscription
+ ADD COLUMN latest_content_delivered INTEGER NOT NULL DEFAULT 0
+ ;
+ CREATE INDEX subscription_latest_content_delivered_idx ON subscription(latest_content_delivered);
+ -- migrate existing values
+ UPDATE subscription SET latest_content_delivered = content_delivered;
+ -- no need for this index
+ DROP INDEX subscription_content_delivered_idx;
+ -- update the view to use latest_cotnent_delivered
+ DROP VIEW subscription_delivery_needed;
+ CREATE VIEW subscription_delivery_needed AS
+ SELECT s.*
+ FROM subscription s JOIN topic t ON s.topic_id = t.id
+ WHERE
+ s.expires > strftime('%s', 'now')
+ AND
+ s.latest_content_delivered < t.content_updated
+ AND
+ s.delivery_next_attempt < strftime('%s', 'now')
+ AND
+ s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active)
+ ;
+
+ INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 2);
+COMMIT;
--- /dev/null
+BEGIN;
+ DROP INDEX subscription_latest_content_delivered_idx;
+ DROP VIEW subscription_delivery_needed;
+ ALTER TABLE subscription
+ DROP COLUMN latest_content_delivered
+ ;
+ CREATE INDEX subscription_content_delivered_idx ON subscription(content_delivered);
+ CREATE VIEW subscription_delivery_needed AS
+ SELECT s.*
+ FROM subscription s JOIN topic t ON s.topic_id = t.id
+ WHERE
+ s.expires > strftime('%s', 'now')
+ AND
+ s.content_delivered < t.content_updated
+ AND
+ s.delivery_next_attempt < strftime('%s', 'now')
+ AND
+ s.id NOT IN (SELECT id FROM subscription_delivery_in_progress_active)
+ ;
+ DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 2;
+COMMIT;
--
UPDATE subscription SET
content_delivered = strftime('%s', 'now'),
+ latest_content_delivered = :topicContentUpdated,
delivery_attempts_since_success = 0,
delivery_next_attempt = 0
WHERE
step('complete subscription', async function () {
const { callback } = testData.subscriptionUpsert;
await db.context(async (dbCtx) => {
- await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+ const topic = await db.topicGetById(dbCtx, topicId);
+ await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topic.contentUpdated);
const subscription = await db.subscriptionGetById(dbCtx, subscriptionId);
assert.strictEqual(Number(subscription.deliveryAttemptsSinceSuccess), 0);
});
});
it('covers migration', async function() {
sinon.stub(db.db, 'oneOrNone').resolves({});
- sinon.stub(db.db, 'multiResult');
- sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.max);
+ sinon.stub(db.db, 'multiResult').resolves({});
+ sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.min);
sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max);
await db.initialize();
});
+ it('covers migration failure', async function() {
+ const expected = new Error('oh no');
+ sinon.stub(db.db, 'oneOrNone').resolves({});
+ sinon.stub(db.db, 'multiResult').rejects(expected);
+ sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.min);
+ sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max);
+ try {
+ await db.initialize();
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
it('covers listener', async function() {
db.listener = {
start: sinon.stub(),
}); // subscriptionDeliveryClaimById
describe('subscriptionDeliveryComplete', function () {
+ let topicContentUpdated;
+ before(function () {
+ topicContentUpdated = new Date();
+ });
it('success', async function() {
const dbResult = {
rowCount: 1,
};
sinon.stub(db.db, 'result').resolves(dbResult);
- await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+ await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
});
it('failure', async function () {
const dbResult = {
};
sinon.stub(db.db, 'result').onCall(0).resolves(dbResult);
try {
- await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+ await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
assert.fail(noExpectedException);
} catch (e) {
assert(e instanceof DBErrors.UnexpectedResult);
};
sinon.stub(db.db, 'result').onCall(0).resolves(dbResult0).onCall(1).resolves(dbResult1);
try {
- await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+ await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
assert.fail(noExpectedException);
} catch (e) {
assert(e instanceof DBErrors.UnexpectedResult);
});
}); // Implementation
+ describe('_initTables', function () {
+ let preparedGet;
+ beforeEach(function () {
+ preparedGet = sinon.stub();
+ sinon.stub(db.db, 'prepare').returns({
+ pluck: () => ({
+ bind: () => ({
+ get: preparedGet,
+ }),
+ }),
+ });
+ sinon.stub(db, '_currentSchema').returns(db.schemaVersionsSupported.min);
+ sinon.stub(db.db, 'exec');
+ });
+ it('covers migration', async function() {
+ preparedGet.returns({});
+ await db._initTables();
+ assert(db.db.exec.called);
+ });
+ it('covers migration failure', async function() {
+ const expected = new Error('oh no');
+ preparedGet.returns({});
+ db.db.exec.throws(expected);
+ try {
+ await db._initTables();
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ }); // _initTables
+
describe('_currentSchema', function () {
it('covers', async function () {
const version = { major: 1, minor: 0, patch: 0 };
}); // subscriptionDeliveryClaimById
describe('subscriptionDeliveryComplete', function () {
+ let topicContentUpdated;
+ before(function () {
+ topicContentUpdated = new Date();
+ });
it('success', async function() {
const dbResult = {
changes: 1,
};
sinon.stub(db.statement.subscriptionDeliverySuccess, 'run').returns(dbResult);
sinon.stub(db.statement.subscriptionDeliveryDone, 'run').returns(dbResult);
- await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+ await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
});
it('failure', async function () {
const dbResult = {
sinon.stub(db.statement.subscriptionDeliverySuccess, 'run').returns(dbResult);
sinon.stub(db.statement.subscriptionDeliveryDone, 'run').returns(dbResult);
try {
- await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+ await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
assert.fail(noExpectedException);
} catch (e) {
assert(e instanceof DBErrors.UnexpectedResult);
sinon.stub(db.statement.subscriptionDeliverySuccess, 'run').returns(dbResult0);
sinon.stub(db.statement.subscriptionDeliveryDone, 'run').returns(dbResult1);
try {
- await db.subscriptionDeliveryComplete(dbCtx, callback, topicId);
+ await db.subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated);
assert.fail(noExpectedException);
} catch (e) {
assert(e instanceof DBErrors.UnexpectedResult);