### Added
- Accept multiple topics in publish requests.
+- Expired subscription entries are removed from the database when their topics are updated.
+- Topics which have been marked deleted are removed from the database after all subscribers have been notified.
## [v1.1.5] - 2021-08-23
}
if (!topic.isActive) {
+ // These should be filtered out when selecting verification tasks to process.
this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
await this.db.verificationRelease(dbCtx, verificationId);
return;
case Enum.Mode.Unsubscribe:
if (verificationAccepted) {
await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+ if (topic.isDeleted) {
+ // Remove a deleted topic after the last subscription is notified.
+ await this.db.topicPendingDelete(txCtx, topic.id);
+ }
}
break;
case Enum.Mode.Denied:
await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+ if (topic.isDeleted) {
+ // Remove a deleted topic after he last subscription is notified.
+ await this.db.topicPendingDelete(txCtx, topic.id);
+ }
break;
default:
throw new Errors.InternalInconsistencyError('no such topic id');
}
+ // Cull any expired subscriptions
+ await this.db.subscriptionDeleteExpired(dbCtx, topicId);
+
logInfoData.url = topicId.url;
if (topic.isDeleted) {
const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
if (!validHub) {
- this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData });
+ this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
if (this.options.communication.strictTopicHubLink) {
await this.db.transaction(dbCtx, async (txCtx) => {
// Set as deleted and set content_updated so subscriptions are notified.
await this.db.topicDeleted(txCtx, topicId);
await this.db.topicFetchComplete(txCtx, topicId);
});
+ // Attempt to remove from db, if no active subscriptions.
+ await this.db.topicPendingDelete(dbCtx, topicId);
return;
}
}
}
+ /**
+ * Remove any expired subscriptions to a topic.
+ * @param {*} dbCtx
+ * @param {*} topicId
+ */
+ async subscriptionDeleteExpired(dbCtx, topicId) {
+ this._notImplemented('subscriptionDeleteExpired', arguments);
+ }
+
+
/**
* Claim subscriptions needing content updates attempted.
* @param {*} dbCtx
this._notImplemented('topicGetAll', arguments);
}
+
/**
* Get topic data, without content.
* @param {*} dbCtx
this._notImplemented('topicGetContentById', arguments);
}
- // /**
- // * Call after an unsubscribe, to check if a topic is awaiting deletion, and that
- // * was the last subscription belaying it.
- // * @param {String|Integer} data topic url or id
- // */
- // async topicPendingDelete(dbCtx, data) {
- // this._notImplemented('topicPendingDelete', arguments);
- // }
+
+ /**
+ * Attempt to delete a topic, which must be set isDeleted, if there
+ * are no more subscriptions belaying its removal.
+ * @param {*} topicId
+ */
+ async topicPendingDelete(dbCtx, topicId) {
+ this._notImplemented('topicPendingDelete', arguments);
+ }
/**
}
+ async subscriptionDeleteExpired(dbCtx, topicId) {
+ const _scope = _fileScope('subscriptionDeleteExpired');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
+ this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
+ return this._engineInfo(result);
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
const _scope = _fileScope('subscriptionDeliveryClaim');
this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
}
+ async topicPendingDelete(dbCtx, topicId) {
+ const _scope = _fileScope('topicPendingDelete');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ await dbCtx.txIf(async (txCtx) => {
+ const topic = await txCtx.one(this.statement.topicGetById, { topicId });
+ if (!topic.isDeleted) {
+ this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+ return;
+ }
+
+ const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
+ if (subscriberCount) {
+ this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+ return;
+ }
+
+ const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
+ if (result.rowCount !== 1) {
+ throw new DBErrors.UnexpectedResult('did not delete topic');
+ }
+ });
+ this.logger.debug(_scope, 'success', { topicId });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
async topicSet(dbCtx, data) {
const _scope = _fileScope('topicSet');
this.logger.debug(_scope, 'called', data);
--- /dev/null
+--
+DELETE FROM subscription
+WHERE topic_id = $(topicId) AND expires < now()
+
--- /dev/null
+--
+DELETE FROM topic
+WHERE id = $(topicId)
+
}
+ subscriptionDeleteExpired(dbCtx, topicId) {
+ const _scope = _fileScope('subscriptionDeleteExpired');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ const result = this.statement.subscriptionDeleteExpired.run({ topicId });
+ this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
+ return this._engineInfo(result);
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
const _scope = _fileScope('subscriptionDeliveryClaim');
this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
}
+ topicPendingDelete(dbCtx, topicId) {
+ const _scope = _fileScope('topicPendingDelete');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ this.db.transaction(() => {
+ const topic = this.statement.topicGetById.get({ topicId });
+ if (!topic.isDeleted) {
+ this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+ return;
+ }
+
+ const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
+ if (subscriberCount) {
+ this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+ return;
+ }
+
+ const result = this.statement.topicDeleteById.run({ topicId });
+ if (result.changes !== 1) {
+ throw new DBErrors.UnexpectedResult('did not delete topic');
+ }
+ })();
+ this.logger.debug(_scope, 'success', { topicId });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
topicSet(dbCtx, data) {
const _scope = _fileScope('topicSet');
this.logger.debug(_scope, 'called', data);
--- /dev/null
+--
+DELETE FROM subscription
+WHERE topic_id = :topicId AND expires < strftime('%s', 'now')
--- /dev/null
+--
+DELETE FROM topic
+WHERE id = :topicId
+
await this.db.topicDeleted(txCtx, topicId);
res.end();
this.logger.info(_scope, 'topic set deleted', { ctx, topicId });
+ // Attempt to remove from db if no active subscriptions.
+ await this.db.topicPendingDelete(txCtx, topicId);
return;
}
assert(communication.db.verificationComplete.called);
});
+ it('unsubscription from deleted topic deletes topic', async function () {
+ communication.db.verificationGetById.restore();
+ verification.mode = 'unsubscribe';
+ sinon.stub(communication.db, 'verificationGetById').resolves(verification);
+ communication.db.topicGetById.restore();
+ sinon.stub(communication.db, 'topicGetById').resolves({
+ ...topic,
+ isDeleted: true,
+ });
+
+ await communication.verificationProcess(dbCtx, callback, topicId, requestId);
+
+ assert(communication.db.subscriptionDelete.called);
+ assert(communication.db.verificationComplete.called);
+ assert(communication.db.topicPendingDelete.called);
+ });
+
it('unsubscription denial succeeds', async function () {
communication.db.verificationGetById.restore();
verification.mode = 'unsubscribe';
}); // Authentication
describe('Topic', function () {
+ let anotherTopicId;
step('requires data', async function () {
try {
await db.context(async (dbCtx) => {
step('deletes a topic', async function () {
await db.context(async (dbCtx) => {
const result = await db.topicSet(dbCtx, testData.anotherTopicSet);
- const anotherTopicId = result.lastInsertRowid;
+ anotherTopicId = result.lastInsertRowid;
await db.topicDeleted(dbCtx, anotherTopicId);
const topic = await db.topicGetById(dbCtx, anotherTopicId);
assert.strictEqual(topic.isDeleted, true);
step('update un-deletes a topic', async function () {
await db.context(async (dbCtx) => {
const result = await db.topicSet(dbCtx, testData.anotherTopicSet);
- const anotherTopicId = result.lastInsertRowid;
+ assert.strictEqual(result.lastInsertRowid, anotherTopicId);
const topic = await db.topicGetById(dbCtx, anotherTopicId);
assert.strictEqual(topic.isDeleted, false);
});
assert(topics.length);
});
});
+ // pending delete of deleted topic with no subscriptions
+ step('really deletes unsubscribed deleted topic', async function() {
+ await db.context(async (dbCtx) => {
+ await db.topicDeleted(dbCtx, anotherTopicId);
+ await db.topicPendingDelete(dbCtx, anotherTopicId);
+ const topic = await db.topicGetById(dbCtx, anotherTopicId);
+ assert(!topic);
+ });
+ });
}); // Topic
describe('Subscription', function () {
assert(!subscription);
});
});
+ step('create expired subscription', async function () {
+ const data = {
+ ...testData.subscriptionUpsert,
+ secret: 'newSecret',
+ topicId,
+ leaseSeconds: -1,
+ };
+ await db.context(async (dbCtx) => {
+ const result = await db.subscriptionUpsert(dbCtx, data);
+ assert(result.lastInsertRowid);
+ assert.notStrictEqual(result.lastInsertRowid, subscriptionId);
+ subscriptionId = result.lastInsertRowid;
+ assert.strictEqual(result.changes, 1);
+ });
+ });
+ step('delete expired subscriptions', async function() {
+ await db.context(async (dbCtx) => {
+ await db.subscriptionDeleteExpired(dbCtx, topicId)
+ const subscription = await db.subscriptionGet(dbCtx, testData.subscriptionUpsert.callback, topicId);
+ assert(!subscription);
+ });
+ });
}); // Subscription
describe('Verification', function () {
changes: 1,
lastInsertRowid: undefined,
duration: 10,
- }
+ };
sinon.stub(db.db, 'result').resolves(dbResult);
const result = await db.subscriptionDelete(dbCtx, callback, topicId);
assert.deepStrictEqual(result, expected);
});
}); // subscriptionDelete
+ describe('subscriptionDeleteExpired', function () {
+ it('success', async function () {
+ const dbResult = {
+ rowCount: 1,
+ rows: [],
+ duration: 10,
+ };
+ const expected = {
+ changes: 1,
+ lastInsertRowid: undefined,
+ duration: 10,
+ };
+ sinon.stub(db.db, 'result').resolves(dbResult);
+ const result = await db.subscriptionDeleteExpired(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ });
+ it('failure', async function() {
+ const expected = new Error();
+ sinon.stub(db.db, 'result').rejects(expected);
+ try {
+ await db.subscriptionDeleteExpired(dbCtx, topicId);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ });
+
describe('subscriptionDeliveryClaim', function () {
it('success', async function() {
const dbResult = [
});
}); // topicGetContentById
+ describe('topicPendingDelete', function () {
+ beforeEach(function () {
+ sinon.stub(db.db, 'one');
+ sinon.stub(db.db, 'result');
+ });
+ it('success', async function () {
+ db.db.one.onCall(0).resolves({
+ id: topicId,
+ isDeleted: true,
+ }).onCall(1).resolves({
+ count: 0,
+ });
+ const dbResult = {
+ rowCount: 1,
+ rows: [],
+ duration: 10,
+ };
+ db.db.result.resolves(dbResult);
+ await db.topicPendingDelete(dbCtx, topicId);
+ assert(db.db.result.called);
+ });
+ it('does not delete non-deleted topic', async function () {
+ db.db.one.onCall(0).resolves({
+ id: topicId,
+ isDeleted: false,
+ }).onCall(1).resolves({
+ count: 0,
+ });
+ await db.topicPendingDelete(dbCtx, topicId);
+ assert(!db.db.result.called);
+ });
+ it('does not delete topic with active subscriptions', async function () {
+ db.db.one.onCall(0).resolves({
+ id: topicId,
+ isDeleted: true,
+ }).onCall(1).resolves({
+ count: 10,
+ });
+ await db.topicPendingDelete(dbCtx, topicId);
+ assert(!db.db.result.called);
+ });
+ it('covers no deletion', async function () {
+ db.db.one.onCall(0).resolves({
+ id: topicId,
+ isDeleted: true,
+ }).onCall(1).resolves({
+ count: 0,
+ });
+ const dbResult = {
+ rowCount: 0,
+ rows: [],
+ duration: 10,
+ };
+ db.db.result.resolves(dbResult);
+ try {
+ await db.topicPendingDelete(dbCtx, topicId);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert(e instanceof DBErrors.UnexpectedResult);
+ }
+ });
+ });
+
describe('topicSet', function () {
let data;
beforeEach(function () {
});
}); // subscriptionDelete
+ describe('subscriptionDeleteExpired', function () {
+ it('success', async function () {
+ const dbResult = {
+ changes: 1,
+ lastInsertRowid: undefined,
+ };
+ const expected = {
+ changes: 1,
+ lastInsertRowid: undefined,
+ };
+ sinon.stub(db.statement.subscriptionDeleteExpired, 'run').returns(dbResult);
+ const result = await db.subscriptionDeleteExpired(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ });
+ it('failure', async function () {
+ const expected = new Error();
+ sinon.stub(db.statement.subscriptionDeleteExpired, 'run').throws(expected);
+ try {
+ await db.subscriptionDeleteExpired(dbCtx, topicId);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ });
+
describe('subscriptionDeliveryClaim', function () {
- it('success', async function() {
+ it('success', async function () {
const dbAllResult = [
{
id: 'c2e254c5-aa6e-4a8f-b1a1-e474b07392bb',
});
}); // topicGetContentById
+ describe('topicPendingDelete', function () {
+ beforeEach(function () {
+ sinon.stub(db.statement.topicGetById, 'get');
+ sinon.stub(db.statement.subscriptionCountByTopicUrl, 'get');
+ sinon.stub(db.statement.topicDeleteById, 'run');
+ });
+ it('success', async function () {
+ db.statement.topicGetById.get.returns({
+ id: topicId,
+ isDeleted: true,
+ });
+ db.statement.subscriptionCountByTopicUrl.get.returns({
+ count: 0,
+ });
+ db.statement.topicDeleteById.run.returns({
+ changes: 1,
+ });
+ db.topicPendingDelete(dbCtx, topicId);
+ assert(db.statement.topicDeleteById.run.called);
+ });
+ it('does not delete non-deleted topic', async function () {
+ db.statement.topicGetById.get.returns({
+ id: topicId,
+ isDeleted: false,
+ });
+ db.statement.subscriptionCountByTopicUrl.get.returns({
+ count: 0,
+ });
+ db.statement.topicDeleteById.run.returns({
+ changes: 1,
+ });
+ db.topicPendingDelete(dbCtx, topicId);
+ assert(!db.statement.topicDeleteById.run.called);
+ });
+ it('does not delete topic with active subscriptions', async function () {
+ db.statement.topicGetById.get.returns({
+ id: topicId,
+ isDeleted: true,
+ });
+ db.statement.subscriptionCountByTopicUrl.get.returns({
+ count: 10,
+ });
+ db.statement.topicDeleteById.run.returns({
+ changes: 1,
+ });
+ db.topicPendingDelete(dbCtx, topicId);
+ assert(!db.statement.topicDeleteById.run.called);
+ });
+ it('covers no deletion', async function () {
+ db.statement.topicGetById.get.returns({
+ id: topicId,
+ isDeleted: true,
+ });
+ db.statement.subscriptionCountByTopicUrl.get.returns({
+ count: 0,
+ });
+ db.statement.topicDeleteById.run.returns({
+ changes: 0,
+ });
+ try {
+ db.topicPendingDelete(dbCtx, topicId);
+ assert.fail(noExpectedException);
+
+ } catch (e) {
+ assert(e instanceof DBErrors.UnexpectedResult);
+ }
+ assert(db.statement.topicDeleteById.run.called);
+ });
+ });
+
describe('topicSet', function () {
let data;
beforeEach(function () {
'subscriptionsByTopicId',
'subscriptionCountByTopicUrl',
'subscriptionDelete',
+ 'subscriptionDeleteExpired',
'subscriptionDeliveryClaim',
'subscriptionDeliveryClaimById',
'subscriptionDeliveryComplete',
'topicGetById',
'topicGetByUrl',
'topicGetContentById',
+ 'topicPendingDelete',
'topicSet',
'topicSetContent',
'topicUpdate',