database migration 1.0.4, store topic fetch etag/last-modified, provide these when...
[websub-hub] / src / db / sqlite / index.js
index 775708ddcb93ccfa97edf0c75b07e63e2888856c..56afa000cb886f900277e4bf819139b93e926707 100644 (file)
@@ -20,12 +20,14 @@ const schemaVersionsSupported = {
   max: {
     major: 1,
     minor: 0,
-    patch: 0,
+    patch: 4,
   },
 };
 
 // max of signed int64 (2^63 - 1), should be enough
 const EPOCH_FOREVER = BigInt('9223372036854775807');
+const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
+const dateToEpoch = (date) => Math.round(date.getTime() / 1000);
 
 class DatabaseSQLite extends Database {
   constructor(logger, options) {
@@ -85,10 +87,17 @@ class DatabaseSQLite extends Database {
     this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
     migrationsWanted.forEach((v) => {
       const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
-      // eslint-disable-next-line security/detect-non-literal-fs-filename
-      const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
-      this.logger.info(_scope, 'applying migration', { version: v });
-      this.db.exec(fSql);
+      try {
+        // eslint-disable-next-line security/detect-non-literal-fs-filename
+        const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
+        this.logger.debug(_scope, 'applying migration', { version: v });
+        const results = this.db.exec(fSql);
+        this.logger.debug(_scope, 'migration results', { results });
+        this.logger.info(_scope, 'applied migration', { version: v });
+      } catch (e) {
+        this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+        throw e;
+      }
     });
   }
 
@@ -299,12 +308,28 @@ class DatabaseSQLite extends Database {
   }
 
 
+  /**
+   * Converts engine subscription fields to native types.
+   * @param {Object} data
+   */
+  static _subscriptionDataToNative(data) {
+    if (data) {
+      ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
+        // eslint-disable-next-line security/detect-object-injection
+        data[field] = epochToDate(data[field]);
+      });
+    }
+    return data;
+  }
+
+
   subscriptionsByTopicId(dbCtx, topicId) {
     const _scope = _fileScope('subscriptionsByTopicId');
     this.logger.debug(_scope, 'called', { topicId });
 
     try {
-      return this.statement.subscriptionsByTopicId.all({ topicId });
+      const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
+      return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, topicId });
       throw e;
@@ -342,6 +367,21 @@ class DatabaseSQLite extends Database {
   }
 
 
+  subscriptionDeleteExpired(dbCtx, topicId) {
+    const _scope = _fileScope('subscriptionDeleteExpired');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      const result = this.statement.subscriptionDeleteExpired.run({ topicId });
+      this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
     const _scope = _fileScope('subscriptionDeliveryClaim');
     this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
@@ -382,14 +422,15 @@ class DatabaseSQLite extends Database {
   }
 
 
-  subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+  subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
     const _scope = _fileScope('subscriptionDeliveryComplete');
-    this.logger.debug(_scope, 'called', { callback, topicId });
+    this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
 
     let result;
     try {
       this.db.transaction(() => {
-        result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId });
+        topicContentUpdated = dateToEpoch(topicContentUpdated);
+        result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId, topicContentUpdated });
         if (result.changes != 1) {
           throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
         }
@@ -400,7 +441,7 @@ class DatabaseSQLite extends Database {
       })();
       return this._engineInfo(result);
     } catch (e) {
-      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
       throw e;
     }
   }
@@ -463,7 +504,7 @@ class DatabaseSQLite extends Database {
     let subscription;
     try {
       subscription = this.statement.subscriptionGet.get({ callback, topicId });
-      return subscription;
+      return DatabaseSQLite._subscriptionDataToNative(subscription);
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, callback, topicId });
       throw e;
@@ -478,7 +519,7 @@ class DatabaseSQLite extends Database {
     let subscription;
     try {
       subscription = this.statement.subscriptionGetById.get({ subscriptionId });
-      return subscription;
+      return DatabaseSQLite._subscriptionDataToNative(subscription);
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, subscriptionId });
       throw e;
@@ -666,7 +707,6 @@ class DatabaseSQLite extends Database {
    * @param {Object} data
    */
   static _topicDataToNative(data) {
-    const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
     if (data) {
       data.isActive = !!data.isActive;
       data.isDeleted = !!data.isDeleted;
@@ -687,7 +727,7 @@ class DatabaseSQLite extends Database {
     let topics;
     try {
       topics = this.statement.topicGetInfoAll.all();
-      } catch (e) {
+    } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, topics });
       throw e;
     }
@@ -751,6 +791,49 @@ class DatabaseSQLite extends Database {
   }
 
 
+  topicPendingDelete(dbCtx, topicId) {
+    const _scope = _fileScope('topicPendingDelete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      this.db.transaction(() => {
+        const topic = this.statement.topicGetById.get({ topicId });
+        if (!topic.isDeleted) {
+          this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+          return;
+        }
+
+        const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
+        if (subscriberCount) {
+          this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+          return;
+        }
+
+        const result = this.statement.topicDeleteById.run({ topicId });
+        if (result.changes !== 1) {
+          throw new DBErrors.UnexpectedResult('did not delete topic');
+        }
+      })();
+      this.logger.debug(_scope, 'success', { topicId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
+  topicPublishHistory(dbCtx, topicId, days) {
+    const _scope = _fileScope('topicPublishHistory');
+    this.logger.debug(_scope, 'called', { topicId, days })
+
+    const events = this.statement.topicPublishHistory.all({ topicId, daysAgo: days });
+    const history = Array.from({ length: days }, () => 0);
+    events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
+
+    return history;
+  }
+
+
   topicSet(dbCtx, data) {
     const _scope = _fileScope('topicSet');
     this.logger.debug(_scope, 'called', data);
@@ -782,6 +865,8 @@ class DatabaseSQLite extends Database {
     const _scope = _fileScope('topicSetContent');
     const topicSetContentData = {
       contentType: null,
+      httpETag: null,
+      httpLastModified: null,
       ...data,
     };
     const logData = {
@@ -798,6 +883,14 @@ class DatabaseSQLite extends Database {
       if (result.changes !=  1) {
         throw new DBErrors.UnexpectedResult('did not set topic content');
       }
+      result = this.statement.topicSetContentHistory.run({
+        topicId: data.topicId,
+        contentHash: data.contentHash,
+        contentSize: data.content.length,
+      });
+      if (result.changes != 1) {
+        throw new DBErrors.UnexpectedResult('did not set topic content history');
+      }
       return this._engineInfo(result);
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, ...logData });