initial commit
[squeep-indie-auther] / src / db / postgres / index.js
diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js
new file mode 100644 (file)
index 0000000..54c6e9b
--- /dev/null
@@ -0,0 +1,638 @@
+/* eslint-disable security/detect-object-injection */
+'use strict';
+
+const pgpInitOptions = {
+  capSQL: true,
+};
+
+const path = require('path');
+const pgp = require('pg-promise')(pgpInitOptions);
+const svh = require('../schema-version-helper');
+const Database = require('../abstract');
+const DBErrors = require('../errors');
+const common = require('../../common');
+
+const _fileScope = common.fileScope(__filename);
+
+const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
+const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
+pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
+const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
+pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
+
+const schemaVersionsSupported = {
+  min: {
+    major: 1,
+    minor: 0,
+    patch: 0,
+  },
+  max: {
+    major: 1,
+    minor: 0,
+    patch: 0,
+  },
+};
+
+class DatabasePostgres extends Database {
+  constructor(logger, options, _pgp = pgp) {
+    super(logger, options);
+
+    this.db = _pgp(options.db.connectionString);
+    this.schemaVersionsSupported = schemaVersionsSupported;
+
+    // Suppress QF warnings when running tests
+    this.noWarnings = options.db.noWarnings;
+
+    // Log queries
+    const queryLogLevel = options.db.queryLogLevel;
+    if (queryLogLevel) {
+      const queryScope = _fileScope('pgp:query');
+      pgpInitOptions.query = (event) => {
+        this.logger[queryLogLevel](queryScope, '', { ...common.pick(event, ['query', 'params']) });
+      };
+    }
+
+    // Log errors
+    const errorScope = _fileScope('pgp:error');
+    pgpInitOptions.error = (err, event) => {
+      this.logger.error(errorScope, '', { err, event });
+    };
+
+    // Deophidiate column names in-place, log results
+    pgpInitOptions.receive = (data, result, event) => {
+      const exemplaryRow = data[0];
+      for (const prop in exemplaryRow) {
+        const camel = common.camelfy(prop);
+        if (!(camel in exemplaryRow)) {
+          for (const d of data) {
+            d[camel] = d[prop];
+            delete d[prop];
+          }
+        }
+      }
+      if (queryLogLevel) {
+        // Omitting .rows
+        const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
+        this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
+      }
+    };
+
+    // Expose these for test coverage
+    this.pgpInitOptions = pgpInitOptions;
+    this._pgp = _pgp;
+
+    this._initStatements(_pgp);
+  }
+
+
+  _queryFileHelper(_pgp) {
+    return (file) => {
+      const _scope = _fileScope('_queryFile');
+      /* istanbul ignore next */
+      const qfParams = {
+        minify: true,
+        ...(this.noWarnings && { noWarnings: this.noWarnings }),
+      };
+      const qf = new _pgp.QueryFile(file, qfParams);
+      if (qf.error) {
+        this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
+        throw qf.error;
+      }
+      return qf;
+    };
+  }
+
+
+  async initialize(applyMigrations = true) {
+    const _scope = _fileScope('initialize');
+    this.logger.debug(_scope, 'called', { applyMigrations });
+    if (applyMigrations) {
+      await this._initTables();
+    }
+    await super.initialize();
+    if (this.listener) {
+      await this.listener.start();
+    }
+  }
+
+
+  async _initTables(_pgp) {
+    const _scope = _fileScope('_initTables');
+    this.logger.debug(_scope, 'called', {});
+
+    const _queryFile = this._queryFileHelper(_pgp || this._pgp);
+
+    // Migrations rely upon this table, ensure it exists.
+    const metaVersionTable = '_meta_schema_version';
+
+    const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
+    let metaExists = await tableExists(metaVersionTable);
+    if (!metaExists) {
+      const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
+      const initSql = _queryFile(fPath);
+      const results = await this.db.multiResult(initSql);
+      this.logger.debug(_scope, 'executed init sql', { results });
+      metaExists = await tableExists(metaVersionTable);
+      /* istanbul ignore if */
+      if (!metaExists) {
+        throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
+      }
+      this.logger.info(_scope, 'created schema version table', { metaVersionTable });
+    }
+
+    // Apply migrations
+    const currentSchema = await this._currentSchema();
+    const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
+    this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
+    for (const v of migrationsWanted) {
+      const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
+      const migrationSql = _queryFile(fPath);
+      const results = await this.db.multiResult(migrationSql);
+      this.logger.debug(_scope, 'executed migration sql', { version: v, results });
+      this.logger.info(_scope, 'applied migration', { version: v });
+    }
+  }
+
+  
+  _initStatements(_pgp) {
+    const _scope = _fileScope('_initStatements');
+    const _queryFile = this._queryFileHelper(_pgp);
+    this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
+    this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
+  }
+
+  
+  async healthCheck() {
+    const _scope = _fileScope('healthCheck');
+    this.logger.debug(_scope, 'called', {});
+    const c = await this.db.connect();
+    c.done();
+    return { serverVersion: c.client.serverVersion };
+  }
+
+
+  async _currentSchema() {
+    return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
+  }
+
+  
+  async _closeConnection() {
+    const _scope = _fileScope('_closeConnection');
+    try {
+      if (this.listener) {
+        await this.listener.stop();
+      }
+      await this._pgp.end();
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      throw e;
+    }
+  }
+
+  
+  /* istanbul ignore next */
+  async _purgeTables(really = false) {
+    const _scope = _fileScope('_purgeTables');
+    try {
+      if (really) {
+        await this.db.tx(async (t) => {
+          await t.batch([
+            'authentication',
+            'resource',
+            'profile',
+            'token',
+          ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
+        });
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      throw e;
+    }
+  }
+
+
+  async context(fn) {
+    return this.db.task(async (t) => fn(t));
+  }
+
+
+  // eslint-disable-next-line class-methods-use-this
+  async transaction(dbCtx, fn) {
+    return dbCtx.txIf(async (t) => fn(t));
+  }
+
+
+  async almanacGetAll(dbCtx) {
+    const _scope = _fileScope('almanacGetAll');
+    this.logger.debug(_scope, 'called');
+
+    try {
+      return await dbCtx.manyOrNone(this.statement.almanacGetAll);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e });
+      throw e;
+    }
+  }
+
+
+  async authenticationGet(dbCtx, identifier) {
+    const _scope = _fileScope('authenticationGet');
+    this.logger.debug(_scope, 'called', { identifier });
+
+    try {
+      return await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier });
+      throw e;
+    }
+  }
+
+
+  async authenticationSuccess(dbCtx, identifier) {
+    const _scope = _fileScope('authenticationSuccess');
+    this.logger.debug(_scope, 'called', { identifier });
+
+    try {
+      const result = await dbCtx.result(this.statement.authenticationSuccess, { identifier });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not update authentication success event');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier });
+      throw e;
+    }
+  }
+
+
+  async authenticationUpsert(dbCtx, identifier, credential) {
+    const _scope = _fileScope('authenticationUpsert');
+    const scrubbedCredential = '*'.repeat((credential || '').length);
+    this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
+
+    try {
+      const result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not upsert authentication');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential });
+      throw e;
+    }
+  }
+
+
+  async profileIdentifierInsert(dbCtx, profile, identifier) {
+    const _scope = _fileScope('profileIdentifierInsert');
+    this.logger.debug(_scope, 'called', { profile, identifier });
+
+    try {
+      const result = await dbCtx.result(this.statement.profileIdentifierInsert, { profile, identifier });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not insert identifier');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, profile, identifier });
+      throw e;
+    }
+  }
+
+
+  async profileIsValid(dbCtx, profile) {
+    const _scope = _fileScope('profileIsValid');
+    this.logger.debug(_scope, 'called', { profile });
+
+    try {
+      const profileResponse = await dbCtx.oneOrNone(this.statement.profileGet, { profile });
+      return !!profileResponse;
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, profile });
+      throw e;
+    }
+  }
+
+
+  async profileScopeInsert(dbCtx, profile, scope) {
+    const _scope = _fileScope('profileScopeInsert');
+    this.logger.debug(_scope, 'called', { profile, scope });
+
+    try {
+      const result = await dbCtx.result(this.statement.profileScopeInsert, { profile, scope });
+      // Duplicate inserts get ignored
+      if (result.rowCount != 1 && result.rowCount != 0) {
+        throw new DBErrors.UnexpectedResult('did not insert profile scope');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, profile, scope });
+      throw e;
+    }
+  }
+
+
+  async profileScopesSetAll(dbCtx, profile, scopes) {
+    const _scope = _fileScope('profileScopesSetAll');
+    this.logger.debug(_scope, 'called', { profile, scopes });
+
+    try {
+      await this.transaction(dbCtx, async (txCtx) => {
+        await txCtx.result(this.statement.profileScopesClear, { profile });
+        if (scopes.length) {
+          await txCtx.result(this.statement.profileScopesSetAll, { profile, scopes });
+        }
+      }); // transaction
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, profile, scopes });
+      throw e;
+    }
+  }
+
+
+  async profilesScopesByIdentifier(dbCtx, identifier) {
+    const _scope = _fileScope('profilesScopesByIdentifier');
+    this.logger.debug(_scope, 'called', { identifier });
+
+    try {
+      const profileScopesRows = await dbCtx.manyOrNone(this.statement.profilesScopesByIdentifier, { identifier });
+      return Database._profilesScopesBuilder(profileScopesRows);
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier });
+      throw e;
+    }
+  }
+
+
+  async redeemCode(dbCtx, { codeId, created, isToken, clientId, profile, identifier, scopes, lifespanSeconds, refreshLifespanSeconds, resource, profileData }) {
+    const _scope = _fileScope('redeemCode');
+    this.logger.debug(_scope, 'called', { codeId, created, isToken, clientId, profile, identifier, scopes, lifespanSeconds, refreshLifespanSeconds, resource, profileData });
+
+    let result, ret = false;
+    try {
+      await this.transaction(dbCtx, async (txCtx) => {
+        result = await txCtx.result(this.statement.redeemCode, { codeId, created, isToken, clientId, profile, identifier, lifespanSeconds, refreshLifespanSeconds, resource, profileData });
+        if (result.rowCount != 1) {
+          this.logger.error(_scope, 'failed', { result });
+          throw new DBErrors.UnexpectedResult('did not redeem code');
+        }
+        // Abort and return false if redemption resulted in revocation.
+        if (result.rows[0].isRevoked) {
+          return;
+        }
+        this.logger.debug(_scope, 'code redeemed', { redeemed: result.rows[0] });
+
+        // Ensure there are entries for all scopes.
+        if (scopes.length !== 0) {
+          await txCtx.result(this.statement.scopesInsert, { scopes });
+        }
+
+        // Record accepted scopes for this token.
+        result = await txCtx.result(this.statement.tokenScopesSet, { codeId, scopes });
+        if (result.rowCount != scopes.length) {
+          this.logger.error(_scope, 'token scope count mismatch', { codeId, scopes, result });
+          throw new DBErrors.UnexpectedResult('did not set all scopes on token');
+        }
+        ret = true;
+      }); // txCtx
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, codeId, created, isToken, clientId, profile, identifier, scopes, lifespanSeconds, refreshLifespanSeconds, profileData });
+      throw e;
+    }
+
+    return ret;
+  }
+
+
+  async refreshCode(dbCtx, codeId, refreshed, removeScopes) {
+    const _scope = _fileScope('refreshCode');
+    this.logger.debug(_scope, 'called', { codeId, refreshed, removeScopes });
+
+    try {
+      return await this.transaction(dbCtx, async (txCtx) => {
+        const refreshedToken = await txCtx.oneOrNone(this.statement.refreshCode, { codeId, refreshed });
+        if (refreshedToken) {
+          if (removeScopes.length) {
+            const removeResult = await txCtx.result(this.statement.tokenScopesRemove, { codeId, removeScopes });
+            if (removeResult.rowCount != removeScopes.length) {
+              this.logger.error(_scope, 'failed to remove token scopes', { actual: removeResult.rowCount, expected: removeScopes.length });
+              throw new DBErrors.UnexpectedResult('did not remove scopes from token');
+            }
+          } else {
+            delete refreshedToken.scopes; // Not updated, remove from response.
+          }
+        } else {
+          this.logger.debug(_scope, 'did not refresh token', {});
+        }
+        return refreshedToken;
+      });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, codeId });
+      throw e;
+    }
+  }
+
+
+  async resourceGet(dbCtx, resourceId) {
+    const _scope = _fileScope('resourceGet');
+    this.logger.debug(_scope, 'called', { resourceId });
+
+    try {
+      return await dbCtx.oneOrNone(this.statement.resourceGet, { resourceId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, resourceId });
+      throw e;
+    }
+  }
+
+
+  async resourceUpsert(dbCtx, resourceId, secret, description) {
+    const _scope = _fileScope('resourceUpsert');
+    const logSecret = secret?.length && common.logTruncate('*'.repeat(secret.length), 3) || undefined;
+    this.logger.debug(_scope, 'called', { resourceId, secret: logSecret, description });
+
+    try {
+      const result = await dbCtx.result(this.statement.resourceUpsert, { resourceId, secret, description });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not upsert resource');
+      }
+      return result.rows[0];
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, resourceId, secret: logSecret, description });
+      throw e;
+    }
+  }
+
+
+  async scopeCleanup(dbCtx, atLeastMsSinceLast) {
+    const _scope = _fileScope('scopeCleanup');
+    this.logger.debug(_scope, 'called', { atLeastMsSinceLast });
+
+    const almanacEvent = 'scopeCleanup';
+    try {
+      return await this.transaction(dbCtx, async (txCtx) => {
+
+        // Check that enough time has passed since last cleanup
+        const now = new Date();
+        const cleanupNotAfter = new Date(now.getTime() - atLeastMsSinceLast);
+        const { date: lastCleanupDate } = await txCtx.oneOrNone(this.statement.almanacGet, { event: almanacEvent }) || { date: new Date(0) };
+        if (lastCleanupDate >= cleanupNotAfter) {
+          this.logger.debug(_scope, 'skipping token cleanup, too soon', { lastCleanupDate, cleanupNotAfter, atLeastMsSinceLast });
+          return;
+        }
+
+        // Do the cleanup
+        const { rowCount: scopesRemoved } = await txCtx.result(this.statement.scopeCleanup);
+
+        // Update the last cleanup time
+        const result = await txCtx.result(this.statement.almanacUpsert, { event: almanacEvent, date: now });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not update almanac');
+        }
+
+        this.logger.debug(_scope, 'completed', { scopesRemoved, atLeastMsSinceLast });
+        return scopesRemoved;
+      }); // tx
+
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, atLeastMsSinceLast });
+      throw e;
+    }
+  }
+
+
+  async scopeDelete(dbCtx, scope) {
+    const _scope = _fileScope('scopeDelete');
+    this.logger.debug(_scope, 'called', { scope });
+
+    try {
+      return await this.transaction(dbCtx, async (txCtx) => {
+        const { inUse } = await txCtx.one(this.statement.scopeInUse, { scope });
+        if (inUse) {
+          this.logger.debug(_scope, 'not deleted, in use', { scope });
+          return false;
+        }
+        const result = await txCtx.result(this.statement.scopeDelete, { scope });
+        if (result.rowCount == 0) {
+          this.logger.debug(_scope, 'no such scope', { scope });
+        } else {
+          this.logger.debug(_scope, 'deleted', { scope });
+        }
+        return true;
+      });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, scope });
+      throw e;
+    }
+  }
+
+
+  async scopeUpsert(dbCtx, scope, application, description, manuallyAdded = false) {
+    const _scope = _fileScope('scopeUpsert');
+    this.logger.debug(_scope, 'called', { scope, description });
+
+    try {
+      const result = await dbCtx.result(this.statement.scopeUpsert, { scope, application, description, manuallyAdded });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not upsert scope');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, scope, application, description });
+      throw e;
+    }
+  }
+
+
+  async tokenCleanup(dbCtx, codeLifespanSeconds, atLeastMsSinceLast) {
+    const _scope = _fileScope('tokenCleanup');
+    this.logger.debug(_scope, 'called', { codeLifespanSeconds, atLeastMsSinceLast });
+
+    const almanacEvent = 'tokenCleanup';
+    try {
+      return await this.transaction(dbCtx, async (txCtx) => {
+
+        // Check that enough time has passed since last cleanup
+        const now = new Date();
+        const cleanupNotAfter = new Date(now.getTime() - atLeastMsSinceLast);
+        const { date: lastCleanupDate } = await txCtx.oneOrNone(this.statement.almanacGet, { event: almanacEvent }) || { date: new Date(0) };
+        if (lastCleanupDate >= cleanupNotAfter) {
+          this.logger.debug(_scope, 'skipping token cleanup, too soon', { lastCleanupDate, cleanupNotAfter, codeLifespanSeconds, atLeastMsSinceLast });
+          return;
+        }
+
+        // Do the cleanup
+        const { rowCount: tokensRemoved } = await txCtx.result(this.statement.tokenCleanup, { codeLifespanSeconds });
+
+        // Update the last cleanup time
+        const result = await txCtx.result(this.statement.almanacUpsert, { event: almanacEvent, date: now });
+        if (result.rowCount != 1) {
+          throw new DBErrors.UnexpectedResult('did not update almanac');
+        }
+
+        this.logger.debug(_scope, 'completed', { tokensRemoved, codeLifespanSeconds, atLeastMsSinceLast });
+        return tokensRemoved;
+      }); // tx
+
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, atLeastMsSinceLast });
+      throw e;
+    }
+  }
+
+
+  async tokenGetByCodeId(dbCtx, codeId) {
+    const _scope = _fileScope('tokenGetByCodeId');
+    this.logger.debug(_scope, 'called', { codeId });
+
+    try {
+      return await dbCtx.oneOrNone(this.statement.tokenGetByCodeId, { codeId });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, codeId });
+      throw e;
+    }
+  }
+
+
+  async tokenRevokeByCodeId(dbCtx, codeId) {
+    const _scope = _fileScope('tokenRevokeByCodeId');
+    this.logger.debug(_scope, 'called', { codeId });
+
+    try {
+      const result = await dbCtx.result(this.statement.tokenRevokeByCodeId, { codeId });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not revoke token');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, codeId });
+      throw e;
+    }
+  }
+
+
+  async tokenRefreshRevokeByCodeId(dbCtx, codeId) {
+    const _scope = _fileScope('tokenRefreshRevokeByCodeId');
+    this.logger.debug(_scope, 'called', { codeId });
+
+    try {
+      const result = await dbCtx.result(this.statement.tokenRefreshRevokeByCodeId, { codeId });
+      if (result.rowCount != 1) {
+        throw new DBErrors.UnexpectedResult('did not revoke token');
+      }
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, codeId });
+      throw e;
+    }
+  }
+
+
+  async tokensGetByIdentifier(dbCtx, identifier) {
+    const _scope = _fileScope('tokensGetByIdentifier');
+    this.logger.debug(_scope, 'called', { identifier });
+
+    try {
+      return await dbCtx.manyOrNone(this.statement.tokensGetByIdentifier, { identifier });
+    } catch (e) {
+      this.logger.error(_scope, 'failed', { error: e, identifier });
+      throw e;
+    }
+  }
+
+}
+
+module.exports = DatabasePostgres;