+/* eslint-disable security/detect-object-injection */
+'use strict';
+
+const pgpInitOptions = {
+ capSQL: true,
+};
+
+const path = require('path');
+const pgp = require('pg-promise')(pgpInitOptions);
+const svh = require('../schema-version-helper');
+const Database = require('../base');
+const DBErrors = require('../errors');
+const Listener = require('./listener');
+const common = require('../../common');
+
+const _fileScope = common.fileScope(__filename);
+
+const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
+const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
+pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
+const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
+pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
+
+const schemaVersionsSupported = {
+ min: {
+ major: 1,
+ minor: 0,
+ patch: 0,
+ },
+ max: {
+ major: 1,
+ minor: 0,
+ patch: 0,
+ },
+};
+
+class DatabasePostgres extends Database {
+ constructor(logger, options, _pgp = pgp) {
+ super(logger, options);
+
+ this.db = _pgp(options.db.connectionString);
+ this.schemaVersionsSupported = schemaVersionsSupported;
+
+ // Suppress QF warnings when running tests
+ this.noWarnings = options.db.noWarnings;
+
+ if (options.db.cacheEnabled) {
+ this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
+ channel: 'cache_invalidation',
+ // dataCallback: this._topicChanged.bind(this),
+ connectionEstablishedCallback: this._listenerEstablished.bind(this),
+ connectionLostCallback: this._listenerLost.bind(this),
+ }));
+ }
+
+ // Log queries
+ const queryLogLevel = options.db.queryLogLevel;
+ if (queryLogLevel) {
+ pgpInitOptions.query = (event) => {
+ // Quell outgoing pings
+ if (event && event.query && event.query.startsWith('NOTIFY')) {
+ return;
+ }
+ this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
+ };
+ }
+
+ // Log errors
+ pgpInitOptions.error = (err, event) => {
+ this.logger.error(_fileScope('pgp:error'), '', { err, event });
+ };
+
+ // Deophidiate column names in-place, log results
+ pgpInitOptions.receive = (data, result, event) => {
+ const exemplaryRow = data[0];
+ for (const prop in exemplaryRow) {
+ const camel = Database._camelfy(prop);
+ if (!(camel in exemplaryRow)) {
+ for (const d of data) {
+ d[camel] = d[prop];
+ delete d[prop];
+ }
+ }
+ }
+ if (queryLogLevel) {
+ // Quell outgoing pings
+ if (result && result.command === 'NOTIFY') {
+ return;
+ }
+ // Omitting .rows
+ const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
+ this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
+ }
+ };
+
+ // Expose these for test coverage
+ this.pgpInitOptions = pgpInitOptions;
+ this._pgp = _pgp;
+
+ this._initStatements(_pgp);
+ }
+
+
+ _queryFileHelper(_pgp) {
+ return (file) => {
+ const _scope = _fileScope('_queryFile');
+ /* istanbul ignore next */
+ const qfParams = {
+ minify: true,
+ ...(this.noWarnings && { noWarnings: this.noWarnings }),
+ };
+ const qf = new _pgp.QueryFile(file, qfParams);
+ if (qf.error) {
+ this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
+ throw qf.error;
+ }
+ return qf;
+ };
+ }
+
+
+ async initialize(applyMigrations = true) {
+ const _scope = _fileScope('initialize');
+ this.logger.debug(_scope, 'called', { applyMigrations });
+ if (applyMigrations) {
+ await this._initTables();
+ }
+ await super.initialize();
+ if (this.listener) {
+ await this.listener.start();
+ }
+ }
+
+
+ async _initTables(_pgp) {
+ const _scope = _fileScope('_initTables');
+ this.logger.debug(_scope, 'called', {});
+
+ const _queryFile = this._queryFileHelper(_pgp || this._pgp);
+
+ // Migrations rely upon this table, ensure it exists.
+ const metaVersionTable = '_meta_schema_version';
+
+ const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
+ let metaExists = await tableExists(metaVersionTable);
+ if (!metaExists) {
+ const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
+ const initSql = _queryFile(fPath);
+ const results = await this.db.multiResult(initSql);
+ this.logger.debug(_scope, 'executed init sql', { results });
+ metaExists = await tableExists(metaVersionTable);
+ /* istanbul ignore if */
+ if (!metaExists) {
+ throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
+ }
+ this.logger.info(_scope, 'created schema version table', { metaVersionTable });
+ }
+
+ // Apply migrations
+ const currentSchema = await this._currentSchema();
+ const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
+ this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
+ for (const v of migrationsWanted) {
+ const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
+ const migrationSql = _queryFile(fPath);
+ const results = await this.db.multiResult(migrationSql);
+ this.logger.debug(_scope, 'executed migration sql', { version: v, results });
+ this.logger.info(_scope, 'applied migration', { version: v });
+ }
+ }
+
+
+ _initStatements(_pgp) {
+ const _scope = _fileScope('_initStatements');
+ const _queryFile = this._queryFileHelper(_pgp);
+ this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
+ this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
+ }
+
+
+ async healthCheck() {
+ const _scope = _fileScope('healthCheck');
+ this.logger.debug(_scope, 'called', {});
+ const c = await this.db.connect();
+ c.done();
+ return { serverVersion: c.client.serverVersion };
+ }
+
+
+ async _currentSchema() {
+ return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
+ }
+
+
+ async _closeConnection() {
+ const _scope = _fileScope('_closeConnection');
+ try {
+ if (this.listener) {
+ await this.listener.stop();
+ }
+ await this._pgp.end();
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ throw e;
+ }
+ }
+
+
+ /* istanbul ignore next */
+ async _purgeTables(really = false) {
+ const _scope = _fileScope('_purgeTables');
+ try {
+ if (really) {
+ await this.db.tx(async (t) => {
+ await t.batch([
+ 'topic',
+ // 'topic_fetch_in_progress',
+ // 'verification',
+ // 'verification_in_progress',
+ // 'subscription',
+ // 'subscription_delivery_in_progress',
+ ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
+ });
+ }
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ throw e;
+ }
+ }
+
+
+ // eslint-disable-next-line class-methods-use-this
+ _resultLog(result) {
+ return common.pick(result, ['command', 'rowCount', 'duration']);
+ }
+
+
+ /**
+ * Receive notices when topic entry is updated.
+ * Clear relevant cache entry.
+ * @param {String} payload
+ */
+ // _topicChanged(payload) {
+ // const _scope = _fileScope('_topicChanged');
+ // if (payload !== 'ping') {
+ // this.logger.debug(_scope, 'called', { payload });
+ // this.cache.delete(payload);
+ // }
+ // }
+
+
+ /**
+ * Called when a listener connection is opened.
+ * Enable cache.
+ */
+ _listenerEstablished() {
+ const _scope = _fileScope('_listenerEstablished');
+ this.logger.debug(_scope, 'called', {});
+ this.cache = new Map();
+ }
+
+
+ /**
+ * Called when a listener connection is closed.
+ * Disable cache.
+ */
+ _listenerLost() {
+ const _scope = _fileScope('_listenerLost');
+ this.logger.debug(_scope, 'called', {});
+ delete this.cache;
+ }
+
+
+ /**
+ * Return a cached entry, if available.
+ * @param {*} key
+ */
+ // _cacheGet(key) {
+ // const _scope = _fileScope('_cacheGet');
+ // if (this.cache && this.cache.has(key)) {
+ // const cacheEntry = this.cache.get(key);
+ // this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
+ // cacheEntry.hits += 1;
+ // cacheEntry.lastHit = new Date();
+ // return cacheEntry.data;
+ // }
+ // }
+
+
+ /**
+ * Store an entry in cache, if available.
+ * @param {*} key
+ * @param {*} data
+ */
+ // _cacheSet(key, data) {
+ // const _scope = _fileScope('_cacheSet');
+ // if (this.cache) {
+ // this.cache.set(key, {
+ // added: new Date(),
+ // hits: 0,
+ // lastHit: undefined,
+ // data,
+ // });
+ // this.logger.debug(_scope, 'added cache entry', { key });
+ // }
+ // }
+
+
+ async context(fn) {
+ return this.db.task(async (t) => fn(t));
+ }
+
+
+ // eslint-disable-next-line class-methods-use-this
+ async transaction(dbCtx, fn) {
+ return dbCtx.txIf(async (t) => fn(t));
+ }
+
+
+ async accountGetByProfile(dbCtx, profile) {
+ const _scope = _fileScope('accountGetByProfile');
+ this.logger.debug(_scope, 'called', { profile });
+
+ try {
+ return await dbCtx.oneOrNone(this.statement.accountGetByProfile, { profile });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, profile });
+ throw e;
+ }
+ }
+
+
+ async accountInsert(dbCtx, profile) {
+ const _scope = _fileScope('accountInsert');
+ this.logger.debug(_scope, 'called', { profile });
+
+ try {
+ const result = await dbCtx.result(this.statement.accountInsert, { profile });
+ if (result.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not insert account');
+ }
+ const account = result.rows[0];
+ this.logger.debug(_scope, 'success', { ...account });
+ return account;
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, profile });
+ throw e;
+ }
+ }
+
+
+ async channelInsert(dbCtx, accountId, name, uid) {
+ const _scope = _fileScope('channelInsert');
+ this.logger.debug(_scope, 'called', { accountId, name, uid });
+
+ try {
+ return await this.transaction(dbCtx, async (txCtx) => {
+ const insertResult = await txCtx.result(this.statement.channelInsert, { accountId, name, uid });
+ if (insertResult.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not insert channel');
+ }
+ const channel = insertResult.rows[0];
+ const rankResult = await txCtx.result(this.statement.channelRankInsert, { channelId: channel.channelId });
+ if (rankResult.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not insert channel rank');
+ }
+ const { rank } = rankResult.rows[0];
+ this.logger.debug(_scope, 'success', { ...channel, rank });
+ return { ...channel, rank };
+ });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, accountId, name, uid });
+ throw e;
+ }
+ }
+
+
+ async channelUpdate(dbCtx, accountId, uid, name) {
+ const _scope = _fileScope('channelUpdate');
+ this.logger.debug(_scope, 'called', { accountId, uid, name });
+
+ try {
+ const result = await dbCtx.result(this.statement.channelUpdate, { accountId, uid, name });
+ if (result.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not update channel');
+ }
+ return result.rows[0];
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, accountId, uid, name });
+ }
+ }
+
+
+ async channelDelete(dbCtx, accountId, uid) {
+ const _scope = _fileScope('channelDelete');
+ this.logger.debug(_scope, 'called', { accountId, uid });
+
+ try {
+ const result = await dbCtx.result(this.statement.channelDelete, { accountId, uid });
+ if (result.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not delete channel');
+ }
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, accountId, uid });
+ throw e;
+ }
+ }
+
+
+ async channelRankUpdate(dbCtx, channelId, rank) {
+ const _scope = _fileScope('channelRankUpdate');
+ this.logger.debug(_scope, 'called', { channelId, rank });
+
+ try {
+ const result = await dbCtx.result(this.statement.channelRankUpdate, { channelId, rank });
+ if (result.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not update channel rank');
+ }
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, channelId, rank });
+ throw e;
+ }
+ }
+
+
+ async channelsGetByAccountId(dbCtx, accountId) {
+ const _scope = _fileScope('channelsGetByAccountId');
+ this.logger.debug(_scope, 'called', { accountId });
+
+ try {
+ const channels = await dbCtx.manyOrNone(this.statement.channelsGetByAccountId, { accountId });
+ return channels || [];
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, accountId });
+ throw e;
+ }
+ }
+
+
+ /**
+ *
+ * @param {*} dbCtx
+ * @param {String} accountId
+ * @returns {Object[]}
+ */
+ async channelsUnreadGetByAccountId(dbCtx, accountId) {
+ const _scope = _fileScope('channelsUnreadGetByAccountId');
+ this.logger.debug(_scope, 'called', { accountId });
+
+ try {
+ const channels = await dbCtx.manyOrNone(this.statement.channelsUnreadGetByAccountId, { accountId });
+ return channels || [];
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, accountId });
+ throw e;
+ }
+ }
+
+ /*
+ async feedGetById(dbCtx, feedId) {
+ const _scope = _fileScope('feedGetById');
+ this.logger.debug(_scope, 'called', { feedId });
+
+ try {
+ return await dbCtx.oneOrNone(this.statement.feedGetById, { feedId });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, feedId });
+ throw e;
+ }
+ }
+
+ async feedInsert(dbCtx, feedData) {
+ const _scope = _fileScope('feedInsert');
+ this.logger.debug(_scope, 'called', { feedData });
+
+ try {
+ const result = await dbCtx.result(this.statement.feedInsert, feedData);
+ if (result.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not insert feed');
+ }
+ return result.rows[0].feedId;
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, feedData });
+ throw e;
+ }
+ }
+ */
+
+}
+
+module.exports = DatabasePostgres;