Initial release
[websub-hub] / src / db / postgres / index.js
diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js
new file mode 100644 (file)
index 0000000..f4f690a
--- /dev/null
@@ -0,0 +1,973 @@
+/* eslint-disable security/detect-object-injection */
+'use strict';
+
+const pgpInitOptions = {
+  capSQL: true,
+};
+
+const path = require('path');
+const pgp = require('pg-promise')(pgpInitOptions);
+const svh = require('../schema-version-helper');
+const Database = require('../base');
+const DBErrors = require('../errors');
+const common = require('../../common');
+
+const _fileScope = common.fileScope(__filename);
+
+const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
+const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
+pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
+const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
+pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
+
+const schemaVersionsSupported = {
+  min: {
+    major: 1,
+    minor: 0,
+    patch: 0,
+  },
+  max: {
+    major: 1,
+    minor: 0,
+    patch: 0,
+  },
+};
+
+class DatabasePostgres extends Database {
+  constructor(logger, options, _pgp = pgp) {
+    super(logger, options);
+
+    this.db = _pgp(options.db.connectionString);
+    this.schemaVersionsSupported = schemaVersionsSupported;
+
+    // Suppress QF warnings when running tests
+    this.noWarnings = options.db.noWarnings;
+
+    // Log queries
+    const queryLogLevel = options.db.queryLogLevel;
+    if (queryLogLevel) {
+      pgpInitOptions.query = (event) => {
+        this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
+      };
+    }
+
+    // Log errors
+    pgpInitOptions.error = (err, event) => {
+      this.logger.error(_fileScope('pgp:error'), '', { err, event });
+    };
+
+    // Deophidiate column names in-place, log results
+    pgpInitOptions.receive = (data, result, event) => {
+      const exemplaryRow = data[0];
+      for (const prop in exemplaryRow) {
+        const camel = Database._camelfy(prop);
+        if (!(camel in exemplaryRow)) {
+          for (const d of data) {
+            d[camel] = d[prop];
+            delete d[prop];
+          }
+        }
+      }
+      if (queryLogLevel) {
+        // Omitting .rows
+        const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
+        this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
+      }
+    };
+
+    // Expose these for test coverage
+    this.pgpInitOptions = pgpInitOptions;
+    this._pgp = _pgp;
+
+    this._initStatements(_pgp);
+  }
+
+
+  _queryFileHelper(_pgp) {
+    return (file) => {
+      const _scope = _fileScope('_queryFile');
+      const qfParams = {
+        minify: true,
+        ...(this.noWarnings && { noWarnings: this.noWarnings }),
+      };
+      const qf = new _pgp.QueryFile(file, qfParams);
+      if (qf.error) {
+        this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
+        throw qf.error;
+      }
+      return qf;
+    };
+  }
+
+
+  async schemaCheck(applyMigrations = true) {
+    const _scope = _fileScope('schemaCheck');
+    this.logger.debug(_scope, 'called', { applyMigrations });
+    if (applyMigrations) {
+      await this._initTables();
+    }
+    await super.schemaCheck();
+  }
+
+
+  async _initTables(_pgp) {
+    const _scope = _fileScope('_initTables');
+    this.logger.debug(_scope, 'called', {});
+
+    const _queryFile = this._queryFileHelper(_pgp || this._pgp);
+
+    // Migrations rely upon this table, ensure it exists.
+    const metaVersionTable = '_meta_schema_version';
+
+    const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
+    let metaExists = await tableExists(metaVersionTable);
+    if (!metaExists) {
+      const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
+      const initSql = _queryFile(fPath);
+      const results = await this.db.multiResult(initSql);
+      this.logger.debug(_scope, 'executed init sql', { results });
+      metaExists = await tableExists(metaVersionTable);
+      /* istanbul ignore if */
+      if (!metaExists) {
+        throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
+      }
+      this.logger.info(_scope, 'created schema version table', { metaVersionTable });
+    }
+
+    // Apply migrations
+    const currentSchema = await this._currentSchema();
+    const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
+    this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
+    for (const v of migrationsWanted) {
+      const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
+      const migrationSql = _queryFile(fPath);
+      const results = await this.db.multiResult(migrationSql);
+      this.logger.debug(_scope, 'executed migration sql', { version: v, results });
+      this.logger.info(_scope, 'applied migration', { version: v });
+    }
+  }
+
+  
+  _initStatements(_pgp) {
+    const _scope = _fileScope('_initStatements');
+    const _queryFile = this._queryFileHelper(_pgp);
+    this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
+    this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
+  }
+
+  
+  async healthCheck() {
+    const _scope = _fileScope('healthCheck');
+    this.logger.debug(_scope, 'called', {});
+    const c = await this.db.connect();
+    c.done();
+    return { serverVersion: c.client.serverVersion };
+  }
+
+
+  async _currentSchema() {
+    return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
+  }
+
+  
+  async _closeConnection() {
+    const _scope = _fileScope('_closeConnection');
+    try {
+      await this._pgp.end();
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      throw e;
+    }
+  }
+
+  
+  async _purgeTables(really = false) {
+    const _scope = _fileScope('_purgeTables');
+    try {
+      if (really) {
+        await this.db.tx(async (t) => {
+          await t.batch([
+            'topic',
+            // 'topic_fetch_in_progress',
+            // 'verification',
+            // 'verification_in_progress',
+            // 'subscription',
+            // 'subscription_delivery_in_progress',
+          ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
+        });
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      throw e;
+    }
+  }
+
+
+  // eslint-disable-next-line class-methods-use-this
+  _engineInfo(result) {
+    return {
+      changes: result.rowCount,
+      lastInsertRowid: result.rows.length ? result.rows[0].id : undefined,
+      duration: result.duration,
+    };
+  }
+
+
+  // eslint-disable-next-line class-methods-use-this
+  _resultLog(result) {
+    return common.pick(result, ['command', 'rowCount', 'duration']);
+  }
+
+
+  async context(fn) {
+    return this.db.task(async (t) => fn(t));
+  }
+
+
+  // eslint-disable-next-line class-methods-use-this
+  async transaction(dbCtx, fn) {
+    return dbCtx.txIf(async (t) => fn(t));
+  }
+
+
+  async authenticationSuccess(dbCtx, identifier) {
+    const _scope = _fileScope('authenticationSuccess');
+    this.logger.debug(_scope, 'called', { identifier });
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.authenticationSuccess, { identifier });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not update authentication success event');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier });
+      throw e;
+    }
+  }
+
+
+  async authenticationGet(dbCtx, identifier) {
+    const _scope = _fileScope('authenticationGet');
+    this.logger.debug(_scope, 'called', { identifier });
+
+    let auth;
+    try {
+      auth = await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier });
+      return auth;
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier });
+      throw e;
+    }
+  }
+
+
+  async authenticationUpsert(dbCtx, identifier, credential) {
+    const _scope = _fileScope('authenticationUpsert');
+    const scrubbedCredential = '*'.repeat((credential || '').length);
+    this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not upsert authentication');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
+      throw e;
+    }
+  }
+
+
+  async subscriptionsByTopicId(dbCtx, topicId) {
+    const _scope = _fileScope('subscriptionsByTopicId');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    let count;
+    try {
+      count = await dbCtx.manyOrNone(this.statement.subscriptionsByTopicId, { topicId });
+      return count;
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
+  async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
+    const _scope = _fileScope('subscriptionCountByTopicUrl');
+    this.logger.debug(_scope, 'called', { topicUrl });
+
+    let count;
+    try {
+      count = await dbCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl });
+      return count;
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicUrl });
+      throw e;
+    }
+  }
+
+
+  async subscriptionDelete(dbCtx, callback, topicId) {
+    const _scope = _fileScope('subscriptionDelete');
+    this.logger.debug(_scope, 'called', { callback, topicId });
+
+    try {
+      const result = await dbCtx.result(this.statement.subscriptionDelete, { callback, topicId });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      throw e;
+    }
+  }
+
+
+  async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
+    const _scope = _fileScope('subscriptionDeliveryClaim');
+    this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
+
+    try {
+      const claims = await dbCtx.txIf(async (txCtx) => {
+        return txCtx.manyOrNone(this.statement.subscriptionDeliveryClaim, { claimant, wanted, claimTimeoutSeconds });
+      });
+      return claims.map((r) => r.id);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, claimant, wanted, claimTimeoutSeconds });
+      throw e;
+    }
+  }
+
+
+  async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
+    const _scope = _fileScope('subscriptionDeliveryClaimById');
+    this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
+
+    let result;
+    try {
+      result = await dbCtx.txIf(async (txCtx) => {
+        result = await txCtx.result(this.statement.subscriptionDeliveryClaimById, { claimant, subscriptionId, claimTimeoutSeconds });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
+        }
+        return result;
+      });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, claimant, subscriptionId, claimTimeoutSeconds });
+      throw e;
+    }
+  }
+
+
+  async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+    const _scope = _fileScope('subscriptionDeliveryComplete');
+    this.logger.debug(_scope, 'called', { callback, topicId });
+
+    let result;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
+        }
+        result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not release subscription delivery');
+        }
+      });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      throw e;
+    }
+  }
+
+
+  async subscriptionDeliveryGone(dbCtx, callback, topicId) {
+    const _scope = _fileScope('subscriptionDeliveryGone');
+    this.logger.debug(_scope, 'called', { callback, topicId });
+
+    let result;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        result = await txCtx.result(this.statement.subscriptionDelete, { callback, topicId });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not delete subscription');
+        }
+        // Delete cascades to delivery
+        // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
+        // if (result.rowCount != 1) {
+        //   throw new DBErrors.UnexpectedResult('did not release subscription delivery');
+        // }
+      });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      throw e;
+    }
+  }
+
+
+  async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
+    const _scope = _fileScope('subscriptionDeliveryIncomplete');
+    this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
+
+    let result;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        const { currentAttempt } = await txCtx.one(this.statement.subscriptionDeliveryAttempts, { callback, topicId });
+        const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
+        result = await txCtx.result(this.statement.subscriptionDeliveryFailure, { nextAttemptDelaySeconds, callback, topicId });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not set subscription delivery failure');
+        }
+        result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not release subscription delivery');
+        }
+      });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      throw e;
+    }
+  }
+
+
+  async subscriptionGet(dbCtx, callback, topicId) {
+    const _scope = _fileScope('subscriptionGet');
+    this.logger.debug(_scope, 'called', { callback, topicId });
+
+    let subscription;
+    try {
+      subscription = await dbCtx.oneOrNone(this.statement.subscriptionGet, { callback, topicId });
+      return subscription;
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+      throw e;
+    }
+  }
+
+
+  async subscriptionGetById(dbCtx, subscriptionId) {
+    const _scope = _fileScope('subscriptionGetById');
+    this.logger.debug(_scope, 'called', { subscriptionId });
+
+    let subscription;
+    try {
+      subscription = await dbCtx.oneOrNone(this.statement.subscriptionGetById, { subscriptionId });
+      return subscription;
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, subscriptionId });
+      throw e;
+    }
+  }
+
+
+  async subscriptionUpdate(dbCtx, data) {
+    const _scope = _fileScope('subscriptionUpdate');
+    this.logger.debug(_scope, 'called', { data });
+
+    const subscriptionData = {
+      ...data,
+    };
+
+    this._subscriptionUpdateDataValidate(subscriptionData);
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.subscriptionUpdate, subscriptionData);
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not update subscription');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, subscriptionData });
+      throw e;
+    }
+  }
+
+
+  async subscriptionUpsert(dbCtx, data) {
+    const _scope = _fileScope('subscriptionUpsert');
+    this.logger.debug(_scope, 'called', { ...data });
+
+    const subscriptionData = {
+      secret: null,
+      httpRemoteAddr: null,
+      httpFrom: null,
+      ...data,
+    };
+    this._subscriptionUpsertDataValidate(subscriptionData);
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.subscriptionUpsert, subscriptionData);
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not upsert subscription');
+      }
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, subscriptionData });
+      throw e;
+    }
+  }
+
+
+  async topicDeleted(dbCtx, topicId) {
+    const _scope = _fileScope('topicDeleted');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.topicDeleted, { topicId });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not update topic as deleted');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed to update topic as deleted', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
+  async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
+    const _scope = _fileScope('topicFetchClaim');
+    this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
+
+    let claims;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        claims = await txCtx.manyOrNone(this.statement.topicContentFetchClaim, { claimant, wanted, claimTimeoutSeconds });
+      });
+      return claims.map((r) => r.id);
+    } catch (e) {
+      this.logger.error(_scope, 'failed to claim topics for fetch', { error: e });
+      throw e;
+    }
+  }
+
+
+  async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
+    const _scope = _fileScope('topicFetchClaimById');
+    this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
+
+    let result;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        result = await txCtx.result(this.statement.topicContentFetchClaimById, { topicId, claimant, claimTimeoutSeconds });
+      });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
+  async topicFetchComplete(dbCtx, topicId) {
+    const _scope = _fileScope('topicFetchComplete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    let result;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        result = await txCtx.result(this.statement.topicAttemptsReset, { topicId });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not reset topic attempts');
+        }
+        result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not release topic fetch');
+        }
+      });
+      this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, result, topicId });
+      throw e;
+    }
+  }
+
+
+  async topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
+    const _scope = _fileScope('topicFetchIncomplete');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    let result;
+    try {
+      result = await dbCtx.txIf(async (txCtx) => {
+        const { contentFetchAttemptsSinceSuccess: currentAttempt } = await txCtx.one(this.statement.topicAttempts, { topicId });
+        const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
+        result = await txCtx.result(this.statement.topicAttemptsIncrement, { topicId, nextAttemptDelaySeconds });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not set topic attempts');
+        }
+        result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not release topic fetch');
+        }
+        return result;
+      });
+      this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, result, topicId });
+      throw e;
+    }
+  }
+
+
+  async topicFetchRequested(dbCtx, topicId) {
+    const _scope = _fileScope('topicFetchRequested');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.topicContentFetchRequested, { topicId });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
+      }
+      this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicId });
+      throw e;
+    }
+  }
+
+
+  async topicGetAll(dbCtx) {
+    const _scope = _fileScope('topicGetAll');
+    this.logger.debug(_scope, 'called');
+
+    let topics;
+    try {
+      topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
+      } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topics });
+      throw e;
+    }
+    if (topics) {
+      topics = topics.map(this._topicDefaults.bind(this));
+    }
+    return topics;
+  }
+
+
+  async topicGetById(dbCtx, topicId, applyDefaults = true) {
+    const _scope = _fileScope('topicGetById');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    let topic;
+    try {
+      topic = await dbCtx.oneOrNone(this.statement.topicGetById, { topicId });
+      if (applyDefaults) {
+        topic = this._topicDefaults(topic);
+      }
+      return topic;
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topic, topicId });
+      throw e;
+    }
+  }
+
+
+  async topicGetByUrl(dbCtx, topicUrl) {
+    const _scope = _fileScope('topicGetByUrl');
+    this.logger.debug(_scope, 'called', { topicUrl });
+
+    let topic;
+    try {
+      topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
+      return this._topicDefaults(topic);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
+      throw e;
+    }
+  }
+
+
+  async topicGetContentById(dbCtx, topicId) {
+    const _scope = _fileScope('topicGetContentById');
+    this.logger.debug(_scope, 'called', { topicId });
+
+    let topic;
+    try {
+      topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
+      return this._topicDefaults(topic);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topic, topicId });
+      throw e;
+    }
+  }
+
+
+  async topicSet(dbCtx, data) {
+    const _scope = _fileScope('topicSet');
+    this.logger.debug(_scope, 'called', data);
+
+    const topicSetData = {
+      publisherValidationUrl: null,
+      leaseSecondsPreferred: null,
+      leaseSecondsMin: null,
+      leaseSecondsMax: null,
+      ...data,
+    };
+  
+    let result;
+    try {
+      this._topicSetDataValidate(topicSetData);
+      result = await dbCtx.result(this.statement.topicUpsert, topicSetData);
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not set topic data');
+      }
+      this.logger.debug(_scope, 'success', { topicSetData, ...this._resultLog(result) });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, result });
+      throw e;
+    }
+  }
+
+
+  async topicSetContent(dbCtx, data) {
+    const _scope = _fileScope('topicSetContent');
+    const topicSetContentData = {
+      contentType: null,
+      ...data,
+    };
+    const logData = {
+      ...topicSetContentData,
+      content: common.logTruncate(topicSetContentData.content, 100),
+    };
+    this.logger.debug(_scope, 'called', data);
+
+    let result;
+    try {
+      this._topicSetContentDataValidate(topicSetContentData);
+      result = await dbCtx.result(this.statement.topicSetContent, topicSetContentData);
+      logData.result = this._resultLog(result);
+      if (result.rowCount !=  1) {
+        throw new DBErrors.UnexpectedResult('did not set topic content');
+      }
+      this.logger.debug(_scope, 'success', { ...logData });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, ...logData });
+      throw e;
+    }
+  }
+
+
+  async topicUpdate(dbCtx, data) {
+    const _scope = _fileScope('topicUpdate');
+    this.logger.debug(_scope, 'called', { data });
+
+    const topicData = {
+      leaseSecondsPreferred: null,
+      leaseSecondsMin: null,
+      leaseSecondsMax: null,
+      publisherValidationUrl: null,
+      ...data,
+    };
+
+    this._topicUpdateDataValidate(topicData);
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.topicUpdate, topicData);
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not update topic');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, topicData });
+      throw e;
+    }
+  }
+
+
+  async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
+    const _scope = _fileScope('verificationClaim');
+    this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
+
+    let result;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        result = await txCtx.manyOrNone(this.statement.verificationClaim, { claimant, wanted, claimTimeoutSeconds });
+      });
+      return result.map((r) => r.id);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { wanted, claimTimeoutSeconds });
+      throw e;
+    }
+  }
+
+
+
+  async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
+    const _scope = _fileScope('verificationClaimById');
+    this.logger.debug(_scope, 'called', { verificationId, claimant, claimTimeoutSeconds });
+
+    let result;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        result = await txCtx.result(this.statement.verificationClaimById, { verificationId, claimant, claimTimeoutSeconds });
+      });
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { verificationId, claimant, claimTimeoutSeconds });
+      throw e;
+    }
+  }
+
+
+  async verificationComplete(dbCtx, verificationId, callback, topicId) {
+    const _scope = _fileScope('verificationComplete');
+    this.logger.debug(_scope, 'called', { verificationId });
+
+    let result;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        result = await txCtx.result(this.statement.verificationScrub, { verificationId, callback, topicId });
+        if (result.rowCount < 1) {
+          throw new DBErrors.UnexpectedResult('did not remove verifications');
+        }
+      });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { verificationId });
+      throw e;
+    }
+    return this._engineInfo(result);
+  }
+
+
+  async verificationGetById(dbCtx, verificationId) {
+    const _scope = _fileScope('verificationGetById');
+    this.logger.debug(_scope, 'called', { verificationId });
+
+    let verification;
+    try {
+      verification = await dbCtx.oneOrNone(this.statement.verificationGetById, { verificationId });
+      return verification;
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, verificationId });
+      throw e;
+    }
+  }
+
+
+  async verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
+    const _scope = _fileScope('verificationIncomplete');
+    this.logger.debug(_scope, 'called', { verificationId });
+
+    let result;
+    try {
+      await dbCtx.txIf(async (txCtx) => {
+        const { attempts } = await txCtx.one(this.statement.verificationAttempts, { verificationId });
+        const nextAttemptDelaySeconds = common.attemptRetrySeconds(attempts, retryDelays);
+        result = await txCtx.result(this.statement.verificationAttemptIncrement, { verificationId, nextAttemptDelaySeconds });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not update verification attempts');
+        }
+        result = await txCtx.result(this.statement.verificationDone, { verificationId });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not release verification');
+        }
+      });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, verificationId });
+      throw e;
+    }
+  }
+
+
+  async verificationInsert(dbCtx, verification) {
+    const _scope = _fileScope('verificationInsert');
+    this.logger.debug(_scope, 'called', { verification });
+
+    const verificationData = {
+      secret: null,
+      httpRemoteAddr: null,
+      httpFrom: null,
+      requestId: null,
+      ...verification,
+    };
+
+    let result, verificationId;
+    try {
+      this._verificationDataValidate(verificationData);
+      result = await dbCtx.result(this.statement.verificationInsert, verificationData);
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not insert verification');
+      }
+      verificationId = result.rows[0].id;
+      this.logger.debug(_scope, 'inserted verification', { verificationId });
+
+      return verificationId;
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, verificationData });
+      throw e;
+    }
+  }
+
+
+  async verificationRelease(dbCtx, verificationId) {
+    const _scope = _fileScope('verificationRelease');
+    this.logger.debug(_scope, 'called', { verificationId });
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.verificationDone, { verificationId });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not release verification');
+      }
+      return this._engineInfo(result);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, verificationId });
+      throw e;
+    }
+  }
+
+
+  async verificationUpdate(dbCtx, verificationId, data) {
+    const _scope = _fileScope('verificationUpdate');
+    this.logger.debug(_scope, 'called', { verificationId, data });
+
+    const verificationData = {
+      reason: null,
+      verificationId,
+      ...data,
+    };
+
+    let result;
+    try {
+      this._verificationUpdateDataValidate(verificationData);
+      result = await dbCtx.result(this.statement.verificationUpdate, verificationData);
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not update verification');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, verificationData });
+      throw e;
+    }
+  }
+
+
+  async verificationValidated(dbCtx, verificationId) {
+    const _scope = _fileScope('verificationValidated');
+    this.logger.debug(_scope, 'called', { verificationId });
+
+    let result;
+    try {
+      result = await dbCtx.result(this.statement.verificationValidate, { verificationId });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not set verification validation');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, verificationId });
+      throw e;
+    } 
+  }
+
+}
+
+module.exports = DatabasePostgres;