X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=src%2Fdb%2Fpostgres%2Findex.js;h=78a4dab6fb775bdf69ba8faaea25fc5e129d6914;hb=3ca7fccb306d0b23626befc3791ffa360b3db1e7;hp=e70aeb7b7883e2700dceb7ce7599d069bf8a515c;hpb=43898cdd317a127bc45e8b3cb2f160df386760a1;p=websub-hub diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index e70aeb7..78a4dab 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -29,8 +29,8 @@ const schemaVersionsSupported = { }, max: { major: 1, - minor: 0, - patch: 4, + minor: 1, + patch: 0, }, }; @@ -45,12 +45,13 @@ class DatabasePostgres extends Database { this.noWarnings = options.db.noWarnings; if (options.db.cacheEnabled) { - this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, { + this.listener = new Listener(logger, this.db, { + ...options.db.listener, channel: 'topic_changed', dataCallback: this._topicChanged.bind(this), connectionEstablishedCallback: this._listenerEstablished.bind(this), connectionLostCallback: this._listenerLost.bind(this), - })); + }); } // Log queries @@ -58,20 +59,22 @@ class DatabasePostgres extends Database { if (queryLogLevel) { pgpInitOptions.query = (event) => { // Quell outgoing pings - if (event && event.query && event.query.startsWith('NOTIFY')) { + if (event?.query?.startsWith('NOTIFY')) { return; } - this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) }); + this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event || {}, ['query', 'params']) }); }; } // Log errors pgpInitOptions.error = (err, event) => { this.logger.error(_fileScope('pgp:error'), '', { err, event }); + + // TODO: close connection on err.code === '57P03' database shutting down }; // Deophidiate column names in-place, log results - pgpInitOptions.receive = (data, result, event) => { + pgpInitOptions.receive = ({ data, result, ctx: event }) => { const exemplaryRow = data[0]; for (const prop in exemplaryRow) { const camel = Database._camelfy(prop); @@ -88,7 +91,7 @@ class DatabasePostgres extends Database { return; } // Omitting .rows - const resultLog = common.pick(result, ['command', 'rowCount', 'duration']); + const resultLog = common.pick(result || {}, ['command', 'rowCount', 'duration']); this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog }); } }; @@ -254,7 +257,7 @@ class DatabasePostgres extends Database { /** * Receive notices when topic entry is updated. * Clear relevant cache entry. - * @param {String} payload + * @param {string} payload topic changed event */ _topicChanged(payload) { const _scope = _fileScope('_topicChanged'); @@ -289,11 +292,12 @@ class DatabasePostgres extends Database { /** * Return a cached entry, if available. - * @param {*} key + * @param {*} key key + * @returns {object=} cached data */ _cacheGet(key) { const _scope = _fileScope('_cacheGet'); - if (this.cache && this.cache.has(key)) { + if (this.cache?.has(key)) { const cacheEntry = this.cache.get(key); this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) }); cacheEntry.hits += 1; @@ -305,8 +309,8 @@ class DatabasePostgres extends Database { /** * Store an entry in cache, if available. - * @param {*} key - * @param {*} data + * @param {*} key key + * @param {*} data data */ _cacheSet(key, data) { const _scope = _fileScope('_cacheSet'); @@ -365,17 +369,36 @@ class DatabasePostgres extends Database { } - async authenticationUpsert(dbCtx, identifier, credential) { + async authenticationUpsert(dbCtx, identifier, credential, otpKey) { const _scope = _fileScope('authenticationUpsert'); const scrubbedCredential = '*'.repeat((credential || '').length); - this.logger.debug(_scope, 'called', { identifier, scrubbedCredential }); + const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null; + this.logger.debug(_scope, 'called', { identifier, scrubbedCredential, scrubbedOTPKey }); let result; try { - result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential }); + result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential, otpKey }); if (result.rowCount != 1) { throw new DBErrors.UnexpectedResult('did not upsert authentication'); } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential, scrubbedOTPKey }); + throw e; + } + } + + + async authenticationUpdateCredential(dbCtx, identifier, credential) { + const _scope = _fileScope('authenticationUpdateCredential'); + const scrubbedCredential = '*'.repeat((credential || '').length); + this.logger.debug(_scope, 'called', { identifier, scrubbedCredential }); + + let result; + try { + result = await dbCtx.result(this.statement.authenticationUpdateCredential, { identifier, credential }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not update authentication credential'); + } } catch (e) { this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential }); throw e; @@ -383,6 +406,24 @@ class DatabasePostgres extends Database { } + async authenticationUpdateOTPKey(dbCtx, identifier, otpKey) { + const _scope = _fileScope('authenticationUpdateOTPKey'); + const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null; + this.logger.debug(_scope, 'called', { identifier, scrubbedOTPKey }); + + let result; + try { + result = await dbCtx.result(this.statement.authenticationUpdateOtpKey, { identifier, otpKey }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not update authentication otp key'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedOTPKey }); + throw e; + } + } + + async subscriptionsByTopicId(dbCtx, topicId) { const _scope = _fileScope('subscriptionsByTopicId'); this.logger.debug(_scope, 'called', { topicId });