update dependencies, fixes to support new authentication features
[websub-hub] / src / db / postgres / index.js
index 213fa1031c3c44fd29c48a46a8af86ff8bed6217..78a4dab6fb775bdf69ba8faaea25fc5e129d6914 100644 (file)
@@ -29,8 +29,8 @@ const schemaVersionsSupported = {
   },
   max: {
     major: 1,
-    minor: 0,
-    patch: 1,
+    minor: 1,
+    patch: 0,
   },
 };
 
@@ -45,12 +45,13 @@ class DatabasePostgres extends Database {
     this.noWarnings = options.db.noWarnings;
 
     if (options.db.cacheEnabled) {
-      this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
+      this.listener = new Listener(logger, this.db, {
+        ...options.db.listener,
         channel: 'topic_changed',
         dataCallback: this._topicChanged.bind(this),
         connectionEstablishedCallback: this._listenerEstablished.bind(this),
         connectionLostCallback: this._listenerLost.bind(this),
-      }));
+      });
     }
 
     // Log queries
@@ -58,20 +59,22 @@ class DatabasePostgres extends Database {
     if (queryLogLevel) {
       pgpInitOptions.query = (event) => {
         // Quell outgoing pings
-        if (event && event.query && event.query.startsWith('NOTIFY')) {
+        if (event?.query?.startsWith('NOTIFY')) {
           return;
         }
-        this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
+        this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event || {}, ['query', 'params']) });
       };
     }
 
     // Log errors
     pgpInitOptions.error = (err, event) => {
       this.logger.error(_fileScope('pgp:error'), '', { err, event });
+
+      // TODO: close connection on err.code === '57P03' database shutting down
     };
 
     // Deophidiate column names in-place, log results
-    pgpInitOptions.receive = (data, result, event) => {
+    pgpInitOptions.receive = ({ data, result, ctx: event }) => {
       const exemplaryRow = data[0];
       for (const prop in exemplaryRow) {
         const camel = Database._camelfy(prop);
@@ -88,7 +91,7 @@ class DatabasePostgres extends Database {
           return;
         }
         // Omitting .rows
-        const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
+        const resultLog = common.pick(result || {}, ['command', 'rowCount', 'duration']);
         this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
       }
     };
@@ -162,10 +165,16 @@ class DatabasePostgres extends Database {
     this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
     for (const v of migrationsWanted) {
       const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
-      const migrationSql = _queryFile(fPath);
-      const results = await this.db.multiResult(migrationSql);
-      this.logger.debug(_scope, 'executed migration sql', { version: v, results });
-      this.logger.info(_scope, 'applied migration', { version: v });
+      try {
+        const migrationSql = _queryFile(fPath);
+        this.logger.debug(_scope, 'applying migration', { version: v });
+        const results = await this.db.multiResult(migrationSql);
+        this.logger.debug(_scope, 'migration results', { results });
+        this.logger.info(_scope, 'applied migration', { version: v });
+      } catch (e) {
+        this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+        throw e;
+      }
     }
   }
 
@@ -248,7 +257,7 @@ class DatabasePostgres extends Database {
   /**
    * Receive notices when topic entry is updated.
    * Clear relevant cache entry.
-   * @param {String} payload
+   * @param {string} payload topic changed event
    */
   _topicChanged(payload) {
     const _scope = _fileScope('_topicChanged');
@@ -283,11 +292,12 @@ class DatabasePostgres extends Database {
 
   /**
    * Return a cached entry, if available.
-   * @param {*} key
+   * @param {*} key key
+   * @returns {object=} cached data
    */
   _cacheGet(key) {
     const _scope = _fileScope('_cacheGet');
-    if (this.cache && this.cache.has(key)) {
+    if (this.cache?.has(key)) {
       const cacheEntry = this.cache.get(key);
       this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
       cacheEntry.hits += 1;
@@ -299,8 +309,8 @@ class DatabasePostgres extends Database {
 
   /**
    * Store an entry in cache, if available.
-   * @param {*} key
-   * @param {*} data
+   * @param {*} key key
+   * @param {*} data data
    */
   _cacheSet(key, data) {
     const _scope = _fileScope('_cacheSet');
@@ -359,19 +369,56 @@ class DatabasePostgres extends Database {
   }
 
 
-  async authenticationUpsert(dbCtx, identifier, credential) {
+  async authenticationUpsert(dbCtx, identifier, credential, otpKey) {
     const _scope = _fileScope('authenticationUpsert');
     const scrubbedCredential = '*'.repeat((credential || '').length);
-    this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
+    const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null;
+    this.logger.debug(_scope, 'called', { identifier, scrubbedCredential, scrubbedOTPKey });
 
     let result;
     try {
-      result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
+      result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential, otpKey });
       if (result.rowCount != 1) {
         throw new DBErrors.UnexpectedResult('did not upsert authentication');
       }
     } catch (e) {
-      this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
+      this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential, scrubbedOTPKey });
+      throw e;
+    }
+  }
+
+
+  async authenticationUpdateCredential(dbCtx, identifier, credential) {
+    const _scope = _fileScope('authenticationUpdateCredential');
+    const scrubbedCredential = '*'.repeat((credential || '').length);
+    this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.authenticationUpdateCredential, { identifier, credential });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not update authentication credential');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential });
+      throw e;
+    }
+  }
+
+
+  async authenticationUpdateOTPKey(dbCtx, identifier, otpKey) {
+    const _scope = _fileScope('authenticationUpdateOTPKey');
+    const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null;
+    this.logger.debug(_scope, 'called', { identifier, scrubbedOTPKey });
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.authenticationUpdateOtpKey, { identifier, otpKey });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not update authentication otp key');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedOTPKey });
       throw e;
     }
   }
@@ -473,14 +520,14 @@ class DatabasePostgres extends Database {
   }
 
 
-  async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+  async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
     const _scope = _fileScope('subscriptionDeliveryComplete');
-    this.logger.debug(_scope, 'called', { callback, topicId });
+    this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
 
     let result;
     try {
       await dbCtx.txIf(async (txCtx) => {
-        result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
+        result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
         if (result.rowCount != 1) {
           throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
         }
@@ -490,7 +537,7 @@ class DatabasePostgres extends Database {
         }
       });
     } catch (e) {
-      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
       throw e;
     }
   }
@@ -783,14 +830,17 @@ class DatabasePostgres extends Database {
   }
 
 
-  async topicGetByUrl(dbCtx, topicUrl) {
+  async topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
     const _scope = _fileScope('topicGetByUrl');
     this.logger.debug(_scope, 'called', { topicUrl });
 
     let topic;
     try {
       topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
-      return this._topicDefaults(topic);
+      if (applyDefaults) {
+        topic = this._topicDefaults(topic);
+      }
+      return topic;
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
       throw e;
@@ -850,6 +900,18 @@ class DatabasePostgres extends Database {
   }
 
 
+  async topicPublishHistory(dbCtx, topicId, days) {
+    const _scope = _fileScope('topicPublishHistory');
+    this.logger.debug(_scope, 'called', { topicId, days });
+
+    const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days });
+    const history = Array.from({ length: days }, () => 0);
+    events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
+
+    return history;
+  }
+
+
   async topicSet(dbCtx, data) {
     const _scope = _fileScope('topicSet');
     this.logger.debug(_scope, 'called', data);
@@ -882,6 +944,8 @@ class DatabasePostgres extends Database {
     const _scope = _fileScope('topicSetContent');
     const topicSetContentData = {
       contentType: null,
+      httpETag: null,
+      httpLastModified: null,
       ...data,
     };
     const logData = {
@@ -898,6 +962,14 @@ class DatabasePostgres extends Database {
       if (result.rowCount !=  1) {
         throw new DBErrors.UnexpectedResult('did not set topic content');
       }
+      result = await dbCtx.result(this.statement.topicSetContentHistory, {
+        topicId: data.topicId,
+        contentHash: data.contentHash,
+        contentSize: data.content.length,
+      });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not set topic content history');
+      }
       this.logger.debug(_scope, 'success', { ...logData });
       return this._engineInfo(result);
     } catch (e) {