expired subscriptions and deleted topics with no subscribers are now removed from...
[websub-hub] / src / db / postgres / index.js
index 255909780fe71c41133fc4b29c30eb5ec563f27a..213fa1031c3c44fd29c48a46a8af86ff8bed6217 100644 (file)
@@ -421,6 +421,21 @@ class DatabasePostgres extends Database {
   }
 
 
+  async subscriptionDeleteExpired(dbCtx, topicId) {
+    const _scope = _fileScope('subscriptionDeleteExpired');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
+      this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
     const _scope = _fileScope('subscriptionDeliveryClaim');
     this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
@@ -804,6 +819,37 @@ class DatabasePostgres extends Database {
   }
 
 
+  async topicPendingDelete(dbCtx, topicId) {
+    const _scope = _fileScope('topicPendingDelete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        const topic = await txCtx.one(this.statement.topicGetById, { topicId });
+        if (!topic.isDeleted) {
+          this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+          return;
+        }
+
+        const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
+        if (subscriberCount) {
+          this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+          return;
+        }
+
+        const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
+        if (result.rowCount !== 1) {
+          throw new DBErrors.UnexpectedResult('did not delete topic');
+        }
+      });
+      this.logger.debug(_scope, 'success', { topicId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   async topicSet(dbCtx, data) {
     const _scope = _fileScope('topicSet');
     this.logger.debug(_scope, 'called', data);