X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=src%2Fdb%2Fpostgres%2Findex.js;h=78a4dab6fb775bdf69ba8faaea25fc5e129d6914;hb=3ca7fccb306d0b23626befc3791ffa360b3db1e7;hp=213fa1031c3c44fd29c48a46a8af86ff8bed6217;hpb=9812213260e952ae601f94ab0915c680e8c80495;p=websub-hub diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index 213fa10..78a4dab 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -29,8 +29,8 @@ const schemaVersionsSupported = { }, max: { major: 1, - minor: 0, - patch: 1, + minor: 1, + patch: 0, }, }; @@ -45,12 +45,13 @@ class DatabasePostgres extends Database { this.noWarnings = options.db.noWarnings; if (options.db.cacheEnabled) { - this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, { + this.listener = new Listener(logger, this.db, { + ...options.db.listener, channel: 'topic_changed', dataCallback: this._topicChanged.bind(this), connectionEstablishedCallback: this._listenerEstablished.bind(this), connectionLostCallback: this._listenerLost.bind(this), - })); + }); } // Log queries @@ -58,20 +59,22 @@ class DatabasePostgres extends Database { if (queryLogLevel) { pgpInitOptions.query = (event) => { // Quell outgoing pings - if (event && event.query && event.query.startsWith('NOTIFY')) { + if (event?.query?.startsWith('NOTIFY')) { return; } - this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) }); + this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event || {}, ['query', 'params']) }); }; } // Log errors pgpInitOptions.error = (err, event) => { this.logger.error(_fileScope('pgp:error'), '', { err, event }); + + // TODO: close connection on err.code === '57P03' database shutting down }; // Deophidiate column names in-place, log results - pgpInitOptions.receive = (data, result, event) => { + pgpInitOptions.receive = ({ data, result, ctx: event }) => { const exemplaryRow = data[0]; for (const prop in exemplaryRow) { const camel = Database._camelfy(prop); @@ -88,7 +91,7 @@ class DatabasePostgres extends Database { return; } // Omitting .rows - const resultLog = common.pick(result, ['command', 'rowCount', 'duration']); + const resultLog = common.pick(result || {}, ['command', 'rowCount', 'duration']); this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog }); } }; @@ -162,10 +165,16 @@ class DatabasePostgres extends Database { this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted }); for (const v of migrationsWanted) { const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql'); - const migrationSql = _queryFile(fPath); - const results = await this.db.multiResult(migrationSql); - this.logger.debug(_scope, 'executed migration sql', { version: v, results }); - this.logger.info(_scope, 'applied migration', { version: v }); + try { + const migrationSql = _queryFile(fPath); + this.logger.debug(_scope, 'applying migration', { version: v }); + const results = await this.db.multiResult(migrationSql); + this.logger.debug(_scope, 'migration results', { results }); + this.logger.info(_scope, 'applied migration', { version: v }); + } catch (e) { + this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v }); + throw e; + } } } @@ -248,7 +257,7 @@ class DatabasePostgres extends Database { /** * Receive notices when topic entry is updated. * Clear relevant cache entry. - * @param {String} payload + * @param {string} payload topic changed event */ _topicChanged(payload) { const _scope = _fileScope('_topicChanged'); @@ -283,11 +292,12 @@ class DatabasePostgres extends Database { /** * Return a cached entry, if available. - * @param {*} key + * @param {*} key key + * @returns {object=} cached data */ _cacheGet(key) { const _scope = _fileScope('_cacheGet'); - if (this.cache && this.cache.has(key)) { + if (this.cache?.has(key)) { const cacheEntry = this.cache.get(key); this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) }); cacheEntry.hits += 1; @@ -299,8 +309,8 @@ class DatabasePostgres extends Database { /** * Store an entry in cache, if available. - * @param {*} key - * @param {*} data + * @param {*} key key + * @param {*} data data */ _cacheSet(key, data) { const _scope = _fileScope('_cacheSet'); @@ -359,19 +369,56 @@ class DatabasePostgres extends Database { } - async authenticationUpsert(dbCtx, identifier, credential) { + async authenticationUpsert(dbCtx, identifier, credential, otpKey) { const _scope = _fileScope('authenticationUpsert'); const scrubbedCredential = '*'.repeat((credential || '').length); - this.logger.debug(_scope, 'called', { identifier, scrubbedCredential }); + const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null; + this.logger.debug(_scope, 'called', { identifier, scrubbedCredential, scrubbedOTPKey }); let result; try { - result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential }); + result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential, otpKey }); if (result.rowCount != 1) { throw new DBErrors.UnexpectedResult('did not upsert authentication'); } } catch (e) { - this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential }) + this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential, scrubbedOTPKey }); + throw e; + } + } + + + async authenticationUpdateCredential(dbCtx, identifier, credential) { + const _scope = _fileScope('authenticationUpdateCredential'); + const scrubbedCredential = '*'.repeat((credential || '').length); + this.logger.debug(_scope, 'called', { identifier, scrubbedCredential }); + + let result; + try { + result = await dbCtx.result(this.statement.authenticationUpdateCredential, { identifier, credential }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not update authentication credential'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential }); + throw e; + } + } + + + async authenticationUpdateOTPKey(dbCtx, identifier, otpKey) { + const _scope = _fileScope('authenticationUpdateOTPKey'); + const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null; + this.logger.debug(_scope, 'called', { identifier, scrubbedOTPKey }); + + let result; + try { + result = await dbCtx.result(this.statement.authenticationUpdateOtpKey, { identifier, otpKey }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not update authentication otp key'); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedOTPKey }); throw e; } } @@ -473,14 +520,14 @@ class DatabasePostgres extends Database { } - async subscriptionDeliveryComplete(dbCtx, callback, topicId) { + async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) { const _scope = _fileScope('subscriptionDeliveryComplete'); - this.logger.debug(_scope, 'called', { callback, topicId }); + this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated }); let result; try { await dbCtx.txIf(async (txCtx) => { - result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId }); + result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated }); if (result.rowCount != 1) { throw new DBErrors.UnexpectedResult('did not set subscription delivery success'); } @@ -490,7 +537,7 @@ class DatabasePostgres extends Database { } }); } catch (e) { - this.logger.error(_scope, 'failed', { error: e, callback, topicId }); + this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated }); throw e; } } @@ -783,14 +830,17 @@ class DatabasePostgres extends Database { } - async topicGetByUrl(dbCtx, topicUrl) { + async topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) { const _scope = _fileScope('topicGetByUrl'); this.logger.debug(_scope, 'called', { topicUrl }); let topic; try { topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl }); - return this._topicDefaults(topic); + if (applyDefaults) { + topic = this._topicDefaults(topic); + } + return topic; } catch (e) { this.logger.error(_scope, 'failed', { error: e, topic, topicUrl }); throw e; @@ -850,6 +900,18 @@ class DatabasePostgres extends Database { } + async topicPublishHistory(dbCtx, topicId, days) { + const _scope = _fileScope('topicPublishHistory'); + this.logger.debug(_scope, 'called', { topicId, days }); + + const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days }); + const history = Array.from({ length: days }, () => 0); + events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates)); + + return history; + } + + async topicSet(dbCtx, data) { const _scope = _fileScope('topicSet'); this.logger.debug(_scope, 'called', data); @@ -882,6 +944,8 @@ class DatabasePostgres extends Database { const _scope = _fileScope('topicSetContent'); const topicSetContentData = { contentType: null, + httpETag: null, + httpLastModified: null, ...data, }; const logData = { @@ -898,6 +962,14 @@ class DatabasePostgres extends Database { if (result.rowCount != 1) { throw new DBErrors.UnexpectedResult('did not set topic content'); } + result = await dbCtx.result(this.statement.topicSetContentHistory, { + topicId: data.topicId, + contentHash: data.contentHash, + contentSize: data.content.length, + }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not set topic content history'); + } this.logger.debug(_scope, 'success', { ...logData }); return this._engineInfo(result); } catch (e) {