db migration 1.0.2, now stores and indexes date of content delivered to subscriber...
[websub-hub] / src / db / postgres / index.js
index 1c5d1d1c4f2253a79c3053232781f2bbb7e45f56..a90e0cbb4277c02ae2a35d16ef68d9e3434ca6ed 100644 (file)
@@ -30,7 +30,7 @@ const schemaVersionsSupported = {
   max: {
     major: 1,
     minor: 0,
-    patch: 0,
+    patch: 2,
   },
 };
 
@@ -162,10 +162,16 @@ class DatabasePostgres extends Database {
     this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
     for (const v of migrationsWanted) {
       const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
-      const migrationSql = _queryFile(fPath);
-      const results = await this.db.multiResult(migrationSql);
-      this.logger.debug(_scope, 'executed migration sql', { version: v, results });
-      this.logger.info(_scope, 'applied migration', { version: v });
+      try {
+        const migrationSql = _queryFile(fPath);
+        this.logger.debug(_scope, 'applying migration', { version: v });
+        const results = await this.db.multiResult(migrationSql);
+        this.logger.debug(_scope, 'migration results', { results });
+        this.logger.info(_scope, 'applied migration', { version: v });
+      } catch (e) {
+        this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+        throw e;
+      }
     }
   }
 
@@ -421,6 +427,21 @@ class DatabasePostgres extends Database {
   }
 
 
+  async subscriptionDeleteExpired(dbCtx, topicId) {
+    const _scope = _fileScope('subscriptionDeleteExpired');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
+      this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
     const _scope = _fileScope('subscriptionDeliveryClaim');
     this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
@@ -458,14 +479,14 @@ class DatabasePostgres extends Database {
   }
 
 
-  async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+  async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
     const _scope = _fileScope('subscriptionDeliveryComplete');
-    this.logger.debug(_scope, 'called', { callback, topicId });
+    this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
 
     let result;
     try {
       await dbCtx.txIf(async (txCtx) => {
-        result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
+        result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
         if (result.rowCount != 1) {
           throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
         }
@@ -475,7 +496,7 @@ class DatabasePostgres extends Database {
         }
       });
     } catch (e) {
-      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
       throw e;
     }
   }
@@ -804,6 +825,37 @@ class DatabasePostgres extends Database {
   }
 
 
+  async topicPendingDelete(dbCtx, topicId) {
+    const _scope = _fileScope('topicPendingDelete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        const topic = await txCtx.one(this.statement.topicGetById, { topicId });
+        if (!topic.isDeleted) {
+          this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+          return;
+        }
+
+        const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
+        if (subscriberCount) {
+          this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+          return;
+        }
+
+        const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
+        if (result.rowCount !== 1) {
+          throw new DBErrors.UnexpectedResult('did not delete topic');
+        }
+      });
+      this.logger.debug(_scope, 'success', { topicId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   async topicSet(dbCtx, data) {
     const _scope = _fileScope('topicSet');
     this.logger.debug(_scope, 'called', data);