X-Git-Url: http://git.squeep.com/?p=squeep-indie-auther;a=blobdiff_plain;f=src%2Fdb%2Fpostgres%2Findex.js;fp=src%2Fdb%2Fpostgres%2Findex.js;h=54c6e9b72ba2ccee85078926113fcc2dea33c08d;hp=0000000000000000000000000000000000000000;hb=b0103b0d496262c438b40bc20304081dbfe41e73;hpb=8ed81748bce7cea7904cac7225b20a60cafdfc16 diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js new file mode 100644 index 0000000..54c6e9b --- /dev/null +++ b/src/db/postgres/index.js @@ -0,0 +1,638 @@ +/* eslint-disable security/detect-object-injection */ +'use strict'; + +const pgpInitOptions = { + capSQL: true, +}; + +const path = require('path'); +const pgp = require('pg-promise')(pgpInitOptions); +const svh = require('../schema-version-helper'); +const Database = require('../abstract'); +const DBErrors = require('../errors'); +const common = require('../../common'); + +const _fileScope = common.fileScope(__filename); + +const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT) +const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[]) +pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT) +const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[]) +pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt)); + +const schemaVersionsSupported = { + min: { + major: 1, + minor: 0, + patch: 0, + }, + max: { + major: 1, + minor: 0, + patch: 0, + }, +}; + +class DatabasePostgres extends Database { + constructor(logger, options, _pgp = pgp) { + super(logger, options); + + this.db = _pgp(options.db.connectionString); + this.schemaVersionsSupported = schemaVersionsSupported; + + // Suppress QF warnings when running tests + this.noWarnings = options.db.noWarnings; + + // Log queries + const queryLogLevel = options.db.queryLogLevel; + if (queryLogLevel) { + const queryScope = _fileScope('pgp:query'); + pgpInitOptions.query = (event) => { + this.logger[queryLogLevel](queryScope, '', { ...common.pick(event, ['query', 'params']) }); + }; + } + + // Log errors + const errorScope = _fileScope('pgp:error'); + pgpInitOptions.error = (err, event) => { + this.logger.error(errorScope, '', { err, event }); + }; + + // Deophidiate column names in-place, log results + pgpInitOptions.receive = (data, result, event) => { + const exemplaryRow = data[0]; + for (const prop in exemplaryRow) { + const camel = common.camelfy(prop); + if (!(camel in exemplaryRow)) { + for (const d of data) { + d[camel] = d[prop]; + delete d[prop]; + } + } + } + if (queryLogLevel) { + // Omitting .rows + const resultLog = common.pick(result, ['command', 'rowCount', 'duration']); + this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog }); + } + }; + + // Expose these for test coverage + this.pgpInitOptions = pgpInitOptions; + this._pgp = _pgp; + + this._initStatements(_pgp); + } + + + _queryFileHelper(_pgp) { + return (file) => { + const _scope = _fileScope('_queryFile'); + /* istanbul ignore next */ + const qfParams = { + minify: true, + ...(this.noWarnings && { noWarnings: this.noWarnings }), + }; + const qf = new _pgp.QueryFile(file, qfParams); + if (qf.error) { + this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file }); + throw qf.error; + } + return qf; + }; + } + + + async initialize(applyMigrations = true) { + const _scope = _fileScope('initialize'); + this.logger.debug(_scope, 'called', { applyMigrations }); + if (applyMigrations) { + await this._initTables(); + } + await super.initialize(); + if (this.listener) { + await this.listener.start(); + } + } + + + async _initTables(_pgp) { + const _scope = _fileScope('_initTables'); + this.logger.debug(_scope, 'called', {}); + + const _queryFile = this._queryFileHelper(_pgp || this._pgp); + + // Migrations rely upon this table, ensure it exists. + const metaVersionTable = '_meta_schema_version'; + + const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name }); + let metaExists = await tableExists(metaVersionTable); + if (!metaExists) { + const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql'); + const initSql = _queryFile(fPath); + const results = await this.db.multiResult(initSql); + this.logger.debug(_scope, 'executed init sql', { results }); + metaExists = await tableExists(metaVersionTable); + /* istanbul ignore if */ + if (!metaExists) { + throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`); + } + this.logger.info(_scope, 'created schema version table', { metaVersionTable }); + } + + // Apply migrations + const currentSchema = await this._currentSchema(); + const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported); + this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted }); + for (const v of migrationsWanted) { + const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql'); + const migrationSql = _queryFile(fPath); + const results = await this.db.multiResult(migrationSql); + this.logger.debug(_scope, 'executed migration sql', { version: v, results }); + this.logger.info(_scope, 'applied migration', { version: v }); + } + } + + + _initStatements(_pgp) { + const _scope = _fileScope('_initStatements'); + const _queryFile = this._queryFileHelper(_pgp); + this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile); + this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length }); + } + + + async healthCheck() { + const _scope = _fileScope('healthCheck'); + this.logger.debug(_scope, 'called', {}); + const c = await this.db.connect(); + c.done(); + return { serverVersion: c.client.serverVersion }; + } + + + async _currentSchema() { + return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1'); + } + + + async _closeConnection() { + const _scope = _fileScope('_closeConnection'); + try { + if (this.listener) { + await this.listener.stop(); + } + await this._pgp.end(); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + throw e; + } + } + + + /* istanbul ignore next */ + async _purgeTables(really = false) { + const _scope = _fileScope('_purgeTables'); + try { + if (really) { + await this.db.tx(async (t) => { + await t.batch([ + 'authentication', + 'resource', + 'profile', + 'token', + ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table }))); + }); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + throw e; + } + } + + + async context(fn) { + return this.db.task(async (t) => fn(t)); + } + + + // eslint-disable-next-line class-methods-use-this + async transaction(dbCtx, fn) { + return dbCtx.txIf(async (t) => fn(t)); + } + + + async almanacGetAll(dbCtx) { + const _scope = _fileScope('almanacGetAll'); + this.logger.debug(_scope, 'called'); + + try { + return await dbCtx.manyOrNone(this.statement.almanacGetAll); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + throw e; + } + } + + + async authenticationGet(dbCtx, identifier) { + const _scope = _fileScope('authenticationGet'); + this.logger.debug(_scope, 'called', { identifier }); + + try { + return await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, identifier }); + throw e; + } + } + + + async authenticationSuccess(dbCtx, identifier) { + const _scope = _fileScope('authenticationSuccess'); + this.logger.debug(_scope, 'called', { identifier }); + + try { + const result = await dbCtx.result(this.statement.authenticationSuccess, { identifier }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not update authentication success event'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, identifier }); + throw e; + } + } + + + async authenticationUpsert(dbCtx, identifier, credential) { + const _scope = _fileScope('authenticationUpsert'); + const scrubbedCredential = '*'.repeat((credential || '').length); + this.logger.debug(_scope, 'called', { identifier, scrubbedCredential }); + + try { + const result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not upsert authentication'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential }); + throw e; + } + } + + + async profileIdentifierInsert(dbCtx, profile, identifier) { + const _scope = _fileScope('profileIdentifierInsert'); + this.logger.debug(_scope, 'called', { profile, identifier }); + + try { + const result = await dbCtx.result(this.statement.profileIdentifierInsert, { profile, identifier }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not insert identifier'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, profile, identifier }); + throw e; + } + } + + + async profileIsValid(dbCtx, profile) { + const _scope = _fileScope('profileIsValid'); + this.logger.debug(_scope, 'called', { profile }); + + try { + const profileResponse = await dbCtx.oneOrNone(this.statement.profileGet, { profile }); + return !!profileResponse; + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, profile }); + throw e; + } + } + + + async profileScopeInsert(dbCtx, profile, scope) { + const _scope = _fileScope('profileScopeInsert'); + this.logger.debug(_scope, 'called', { profile, scope }); + + try { + const result = await dbCtx.result(this.statement.profileScopeInsert, { profile, scope }); + // Duplicate inserts get ignored + if (result.rowCount != 1 && result.rowCount != 0) { + throw new DBErrors.UnexpectedResult('did not insert profile scope'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, profile, scope }); + throw e; + } + } + + + async profileScopesSetAll(dbCtx, profile, scopes) { + const _scope = _fileScope('profileScopesSetAll'); + this.logger.debug(_scope, 'called', { profile, scopes }); + + try { + await this.transaction(dbCtx, async (txCtx) => { + await txCtx.result(this.statement.profileScopesClear, { profile }); + if (scopes.length) { + await txCtx.result(this.statement.profileScopesSetAll, { profile, scopes }); + } + }); // transaction + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, profile, scopes }); + throw e; + } + } + + + async profilesScopesByIdentifier(dbCtx, identifier) { + const _scope = _fileScope('profilesScopesByIdentifier'); + this.logger.debug(_scope, 'called', { identifier }); + + try { + const profileScopesRows = await dbCtx.manyOrNone(this.statement.profilesScopesByIdentifier, { identifier }); + return Database._profilesScopesBuilder(profileScopesRows); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, identifier }); + throw e; + } + } + + + async redeemCode(dbCtx, { codeId, created, isToken, clientId, profile, identifier, scopes, lifespanSeconds, refreshLifespanSeconds, resource, profileData }) { + const _scope = _fileScope('redeemCode'); + this.logger.debug(_scope, 'called', { codeId, created, isToken, clientId, profile, identifier, scopes, lifespanSeconds, refreshLifespanSeconds, resource, profileData }); + + let result, ret = false; + try { + await this.transaction(dbCtx, async (txCtx) => { + result = await txCtx.result(this.statement.redeemCode, { codeId, created, isToken, clientId, profile, identifier, lifespanSeconds, refreshLifespanSeconds, resource, profileData }); + if (result.rowCount != 1) { + this.logger.error(_scope, 'failed', { result }); + throw new DBErrors.UnexpectedResult('did not redeem code'); + } + // Abort and return false if redemption resulted in revocation. + if (result.rows[0].isRevoked) { + return; + } + this.logger.debug(_scope, 'code redeemed', { redeemed: result.rows[0] }); + + // Ensure there are entries for all scopes. + if (scopes.length !== 0) { + await txCtx.result(this.statement.scopesInsert, { scopes }); + } + + // Record accepted scopes for this token. + result = await txCtx.result(this.statement.tokenScopesSet, { codeId, scopes }); + if (result.rowCount != scopes.length) { + this.logger.error(_scope, 'token scope count mismatch', { codeId, scopes, result }); + throw new DBErrors.UnexpectedResult('did not set all scopes on token'); + } + ret = true; + }); // txCtx + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, codeId, created, isToken, clientId, profile, identifier, scopes, lifespanSeconds, refreshLifespanSeconds, profileData }); + throw e; + } + + return ret; + } + + + async refreshCode(dbCtx, codeId, refreshed, removeScopes) { + const _scope = _fileScope('refreshCode'); + this.logger.debug(_scope, 'called', { codeId, refreshed, removeScopes }); + + try { + return await this.transaction(dbCtx, async (txCtx) => { + const refreshedToken = await txCtx.oneOrNone(this.statement.refreshCode, { codeId, refreshed }); + if (refreshedToken) { + if (removeScopes.length) { + const removeResult = await txCtx.result(this.statement.tokenScopesRemove, { codeId, removeScopes }); + if (removeResult.rowCount != removeScopes.length) { + this.logger.error(_scope, 'failed to remove token scopes', { actual: removeResult.rowCount, expected: removeScopes.length }); + throw new DBErrors.UnexpectedResult('did not remove scopes from token'); + } + } else { + delete refreshedToken.scopes; // Not updated, remove from response. + } + } else { + this.logger.debug(_scope, 'did not refresh token', {}); + } + return refreshedToken; + }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, codeId }); + throw e; + } + } + + + async resourceGet(dbCtx, resourceId) { + const _scope = _fileScope('resourceGet'); + this.logger.debug(_scope, 'called', { resourceId }); + + try { + return await dbCtx.oneOrNone(this.statement.resourceGet, { resourceId }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, resourceId }); + throw e; + } + } + + + async resourceUpsert(dbCtx, resourceId, secret, description) { + const _scope = _fileScope('resourceUpsert'); + const logSecret = secret?.length && common.logTruncate('*'.repeat(secret.length), 3) || undefined; + this.logger.debug(_scope, 'called', { resourceId, secret: logSecret, description }); + + try { + const result = await dbCtx.result(this.statement.resourceUpsert, { resourceId, secret, description }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not upsert resource'); + } + return result.rows[0]; + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, resourceId, secret: logSecret, description }); + throw e; + } + } + + + async scopeCleanup(dbCtx, atLeastMsSinceLast) { + const _scope = _fileScope('scopeCleanup'); + this.logger.debug(_scope, 'called', { atLeastMsSinceLast }); + + const almanacEvent = 'scopeCleanup'; + try { + return await this.transaction(dbCtx, async (txCtx) => { + + // Check that enough time has passed since last cleanup + const now = new Date(); + const cleanupNotAfter = new Date(now.getTime() - atLeastMsSinceLast); + const { date: lastCleanupDate } = await txCtx.oneOrNone(this.statement.almanacGet, { event: almanacEvent }) || { date: new Date(0) }; + if (lastCleanupDate >= cleanupNotAfter) { + this.logger.debug(_scope, 'skipping token cleanup, too soon', { lastCleanupDate, cleanupNotAfter, atLeastMsSinceLast }); + return; + } + + // Do the cleanup + const { rowCount: scopesRemoved } = await txCtx.result(this.statement.scopeCleanup); + + // Update the last cleanup time + const result = await txCtx.result(this.statement.almanacUpsert, { event: almanacEvent, date: now }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not update almanac'); + } + + this.logger.debug(_scope, 'completed', { scopesRemoved, atLeastMsSinceLast }); + return scopesRemoved; + }); // tx + + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, atLeastMsSinceLast }); + throw e; + } + } + + + async scopeDelete(dbCtx, scope) { + const _scope = _fileScope('scopeDelete'); + this.logger.debug(_scope, 'called', { scope }); + + try { + return await this.transaction(dbCtx, async (txCtx) => { + const { inUse } = await txCtx.one(this.statement.scopeInUse, { scope }); + if (inUse) { + this.logger.debug(_scope, 'not deleted, in use', { scope }); + return false; + } + const result = await txCtx.result(this.statement.scopeDelete, { scope }); + if (result.rowCount == 0) { + this.logger.debug(_scope, 'no such scope', { scope }); + } else { + this.logger.debug(_scope, 'deleted', { scope }); + } + return true; + }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, scope }); + throw e; + } + } + + + async scopeUpsert(dbCtx, scope, application, description, manuallyAdded = false) { + const _scope = _fileScope('scopeUpsert'); + this.logger.debug(_scope, 'called', { scope, description }); + + try { + const result = await dbCtx.result(this.statement.scopeUpsert, { scope, application, description, manuallyAdded }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not upsert scope'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, scope, application, description }); + throw e; + } + } + + + async tokenCleanup(dbCtx, codeLifespanSeconds, atLeastMsSinceLast) { + const _scope = _fileScope('tokenCleanup'); + this.logger.debug(_scope, 'called', { codeLifespanSeconds, atLeastMsSinceLast }); + + const almanacEvent = 'tokenCleanup'; + try { + return await this.transaction(dbCtx, async (txCtx) => { + + // Check that enough time has passed since last cleanup + const now = new Date(); + const cleanupNotAfter = new Date(now.getTime() - atLeastMsSinceLast); + const { date: lastCleanupDate } = await txCtx.oneOrNone(this.statement.almanacGet, { event: almanacEvent }) || { date: new Date(0) }; + if (lastCleanupDate >= cleanupNotAfter) { + this.logger.debug(_scope, 'skipping token cleanup, too soon', { lastCleanupDate, cleanupNotAfter, codeLifespanSeconds, atLeastMsSinceLast }); + return; + } + + // Do the cleanup + const { rowCount: tokensRemoved } = await txCtx.result(this.statement.tokenCleanup, { codeLifespanSeconds }); + + // Update the last cleanup time + const result = await txCtx.result(this.statement.almanacUpsert, { event: almanacEvent, date: now }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not update almanac'); + } + + this.logger.debug(_scope, 'completed', { tokensRemoved, codeLifespanSeconds, atLeastMsSinceLast }); + return tokensRemoved; + }); // tx + + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, atLeastMsSinceLast }); + throw e; + } + } + + + async tokenGetByCodeId(dbCtx, codeId) { + const _scope = _fileScope('tokenGetByCodeId'); + this.logger.debug(_scope, 'called', { codeId }); + + try { + return await dbCtx.oneOrNone(this.statement.tokenGetByCodeId, { codeId }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, codeId }); + throw e; + } + } + + + async tokenRevokeByCodeId(dbCtx, codeId) { + const _scope = _fileScope('tokenRevokeByCodeId'); + this.logger.debug(_scope, 'called', { codeId }); + + try { + const result = await dbCtx.result(this.statement.tokenRevokeByCodeId, { codeId }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not revoke token'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, codeId }); + throw e; + } + } + + + async tokenRefreshRevokeByCodeId(dbCtx, codeId) { + const _scope = _fileScope('tokenRefreshRevokeByCodeId'); + this.logger.debug(_scope, 'called', { codeId }); + + try { + const result = await dbCtx.result(this.statement.tokenRefreshRevokeByCodeId, { codeId }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not revoke token'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, codeId }); + throw e; + } + } + + + async tokensGetByIdentifier(dbCtx, identifier) { + const _scope = _fileScope('tokensGetByIdentifier'); + this.logger.debug(_scope, 'called', { identifier }); + + try { + return await dbCtx.manyOrNone(this.statement.tokensGetByIdentifier, { identifier }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, identifier }); + throw e; + } + } + +} + +module.exports = DatabasePostgres;