database migration 1.0.4, store topic fetch etag/last-modified, provide these when...
[websub-hub] / src / db / postgres / index.js
index f4f690a18e3a95d18bb55c3596d1a9c2d797fc66..34511102ec5a0e596d0408e2a2bb8bfedeb94eab 100644 (file)
@@ -10,6 +10,7 @@ const pgp = require('pg-promise')(pgpInitOptions);
 const svh = require('../schema-version-helper');
 const Database = require('../base');
 const DBErrors = require('../errors');
+const Listener = require('./listener');
 const common = require('../../common');
 
 const _fileScope = common.fileScope(__filename);
@@ -29,7 +30,7 @@ const schemaVersionsSupported = {
   max: {
     major: 1,
     minor: 0,
-    patch: 0,
+    patch: 4,
   },
 };
 
@@ -43,10 +44,23 @@ class DatabasePostgres extends Database {
     // Suppress QF warnings when running tests
     this.noWarnings = options.db.noWarnings;
 
+    if (options.db.cacheEnabled) {
+      this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
+        channel: 'topic_changed',
+        dataCallback: this._topicChanged.bind(this),
+        connectionEstablishedCallback: this._listenerEstablished.bind(this),
+        connectionLostCallback: this._listenerLost.bind(this),
+      }));
+    }
+
     // Log queries
     const queryLogLevel = options.db.queryLogLevel;
     if (queryLogLevel) {
       pgpInitOptions.query = (event) => {
+        // Quell outgoing pings
+        if (event && event.query && event.query.startsWith('NOTIFY')) {
+          return;
+        }
         this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
       };
     }
@@ -69,6 +83,10 @@ class DatabasePostgres extends Database {
         }
       }
       if (queryLogLevel) {
+        // Quell outgoing pings
+        if (result && result.command === 'NOTIFY') {
+          return;
+        }
         // Omitting .rows
         const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
         this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
@@ -86,6 +104,7 @@ class DatabasePostgres extends Database {
   _queryFileHelper(_pgp) {
     return (file) => {
       const _scope = _fileScope('_queryFile');
+      /* istanbul ignore next */
       const qfParams = {
         minify: true,
         ...(this.noWarnings && { noWarnings: this.noWarnings }),
@@ -100,13 +119,16 @@ class DatabasePostgres extends Database {
   }
 
 
-  async schemaCheck(applyMigrations = true) {
-    const _scope = _fileScope('schemaCheck');
+  async initialize(applyMigrations = true) {
+    const _scope = _fileScope('initialize');
     this.logger.debug(_scope, 'called', { applyMigrations });
     if (applyMigrations) {
       await this._initTables();
     }
-    await super.schemaCheck();
+    await super.initialize();
+    if (this.listener) {
+      await this.listener.start();
+    }
   }
 
 
@@ -140,10 +162,16 @@ class DatabasePostgres extends Database {
     this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
     for (const v of migrationsWanted) {
       const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
-      const migrationSql = _queryFile(fPath);
-      const results = await this.db.multiResult(migrationSql);
-      this.logger.debug(_scope, 'executed migration sql', { version: v, results });
-      this.logger.info(_scope, 'applied migration', { version: v });
+      try {
+        const migrationSql = _queryFile(fPath);
+        this.logger.debug(_scope, 'applying migration', { version: v });
+        const results = await this.db.multiResult(migrationSql);
+        this.logger.debug(_scope, 'migration results', { results });
+        this.logger.info(_scope, 'applied migration', { version: v });
+      } catch (e) {
+        this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+        throw e;
+      }
     }
   }
 
@@ -173,6 +201,9 @@ class DatabasePostgres extends Database {
   async _closeConnection() {
     const _scope = _fileScope('_closeConnection');
     try {
+      if (this.listener) {
+        await this.listener.stop();
+      }
       await this._pgp.end();
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e });
@@ -181,6 +212,7 @@ class DatabasePostgres extends Database {
   }
 
   
+  /* istanbul ignore next */
   async _purgeTables(really = false) {
     const _scope = _fileScope('_purgeTables');
     try {
@@ -219,6 +251,77 @@ class DatabasePostgres extends Database {
   }
 
 
+  /**
+   * Receive notices when topic entry is updated.
+   * Clear relevant cache entry.
+   * @param {String} payload
+   */
+  _topicChanged(payload) {
+    const _scope = _fileScope('_topicChanged');
+    if (payload !== 'ping') {
+      this.logger.debug(_scope, 'called', { payload });
+      this.cache.delete(payload);
+    }
+  }
+
+
+  /**
+   * Called when a listener connection is opened.
+   * Enable cache.
+   */
+  _listenerEstablished() {
+    const _scope = _fileScope('_listenerEstablished');
+    this.logger.debug(_scope, 'called', {});
+    this.cache = new Map();
+  }
+
+
+  /**
+   * Called when a listener connection is closed.
+   * Disable cache.
+   */
+  _listenerLost() {
+    const _scope = _fileScope('_listenerLost');
+    this.logger.debug(_scope, 'called', {});
+    delete this.cache;
+  }
+
+
+  /**
+   * Return a cached entry, if available.
+   * @param {*} key
+   */
+  _cacheGet(key) {
+    const _scope = _fileScope('_cacheGet');
+    if (this.cache && this.cache.has(key)) {
+      const cacheEntry = this.cache.get(key);
+      this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
+      cacheEntry.hits += 1;
+      cacheEntry.lastHit = new Date();
+      return cacheEntry.data;
+    }
+  }
+
+
+  /**
+   * Store an entry in cache, if available.
+   * @param {*} key
+   * @param {*} data
+   */
+  _cacheSet(key, data) {
+    const _scope = _fileScope('_cacheSet');
+    if (this.cache) {
+      this.cache.set(key, {
+        added: new Date(),
+        hits: 0,
+        lastHit: undefined,
+        data,
+      });
+      this.logger.debug(_scope, 'added cache entry', { key });
+    }
+  }
+
+
   async context(fn) {
     return this.db.task(async (t) => fn(t));
   }
@@ -324,6 +427,21 @@ class DatabasePostgres extends Database {
   }
 
 
+  async subscriptionDeleteExpired(dbCtx, topicId) {
+    const _scope = _fileScope('subscriptionDeleteExpired');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
+      this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
   async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
     const _scope = _fileScope('subscriptionDeliveryClaim');
     this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
@@ -361,14 +479,14 @@ class DatabasePostgres extends Database {
   }
 
 
-  async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+  async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
     const _scope = _fileScope('subscriptionDeliveryComplete');
-    this.logger.debug(_scope, 'called', { callback, topicId });
+    this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
 
     let result;
     try {
       await dbCtx.txIf(async (txCtx) => {
-        result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
+        result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
         if (result.rowCount != 1) {
           throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
         }
@@ -378,7 +496,7 @@ class DatabasePostgres extends Database {
         }
       });
     } catch (e) {
-      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
       throw e;
     }
   }
@@ -642,7 +760,7 @@ class DatabasePostgres extends Database {
     let topics;
     try {
       topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
-      } catch (e) {
+    } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, topics });
       throw e;
     }
@@ -692,8 +810,14 @@ class DatabasePostgres extends Database {
 
     let topic;
     try {
+      topic = this._cacheGet(topicId);
+      if (topic) {
+        return topic;
+      }
       topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
-      return this._topicDefaults(topic);
+      const topicWithDefaults = this._topicDefaults(topic);
+      this._cacheSet(topicId, topicWithDefaults);
+      return topicWithDefaults;
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, topic, topicId });
       throw e;
@@ -701,6 +825,49 @@ class DatabasePostgres extends Database {
   }
 
 
+  async topicPendingDelete(dbCtx, topicId) {
+    const _scope = _fileScope('topicPendingDelete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        const topic = await txCtx.one(this.statement.topicGetById, { topicId });
+        if (!topic.isDeleted) {
+          this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+          return;
+        }
+
+        const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
+        if (subscriberCount) {
+          this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+          return;
+        }
+
+        const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
+        if (result.rowCount !== 1) {
+          throw new DBErrors.UnexpectedResult('did not delete topic');
+        }
+      });
+      this.logger.debug(_scope, 'success', { topicId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
+  async topicPublishHistory(dbCtx, topicId, days) {
+    const _scope = _fileScope('topicPublishHistory');
+    this.logger.debug(_scope, 'called', { topicId, days });
+
+    const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days });
+    const history = Array.from({ length: days }, () => 0);
+    events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
+
+    return history;
+  }
+
+
   async topicSet(dbCtx, data) {
     const _scope = _fileScope('topicSet');
     this.logger.debug(_scope, 'called', data);
@@ -733,6 +900,8 @@ class DatabasePostgres extends Database {
     const _scope = _fileScope('topicSetContent');
     const topicSetContentData = {
       contentType: null,
+      httpETag: null,
+      httpLastModified: null,
       ...data,
     };
     const logData = {
@@ -749,6 +918,14 @@ class DatabasePostgres extends Database {
       if (result.rowCount !=  1) {
         throw new DBErrors.UnexpectedResult('did not set topic content');
       }
+      result = await dbCtx.result(this.statement.topicSetContentHistory, {
+        topicId: data.topicId,
+        contentHash: data.contentHash,
+        contentSize: data.content.length,
+      });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not set topic content history');
+      }
       this.logger.debug(_scope, 'success', { ...logData });
       return this._engineInfo(result);
     } catch (e) {