X-Git-Url: http://git.squeep.com/?p=websub-hub;a=blobdiff_plain;f=src%2Fdb%2Fpostgres%2Findex.js;h=34511102ec5a0e596d0408e2a2bb8bfedeb94eab;hp=d02d98165360f33819e220b195e3af0d18c26cd2;hb=71efac9dcd7dc219cb83799391e7adc63cd4c662;hpb=4f64b8910e1295207a42c757cb81c9b0e9ee3be2 diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index d02d981..3451110 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -10,6 +10,7 @@ const pgp = require('pg-promise')(pgpInitOptions); const svh = require('../schema-version-helper'); const Database = require('../base'); const DBErrors = require('../errors'); +const Listener = require('./listener'); const common = require('../../common'); const _fileScope = common.fileScope(__filename); @@ -29,7 +30,7 @@ const schemaVersionsSupported = { max: { major: 1, minor: 0, - patch: 0, + patch: 4, }, }; @@ -43,10 +44,23 @@ class DatabasePostgres extends Database { // Suppress QF warnings when running tests this.noWarnings = options.db.noWarnings; + if (options.db.cacheEnabled) { + this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, { + channel: 'topic_changed', + dataCallback: this._topicChanged.bind(this), + connectionEstablishedCallback: this._listenerEstablished.bind(this), + connectionLostCallback: this._listenerLost.bind(this), + })); + } + // Log queries const queryLogLevel = options.db.queryLogLevel; if (queryLogLevel) { pgpInitOptions.query = (event) => { + // Quell outgoing pings + if (event && event.query && event.query.startsWith('NOTIFY')) { + return; + } this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) }); }; } @@ -69,6 +83,10 @@ class DatabasePostgres extends Database { } } if (queryLogLevel) { + // Quell outgoing pings + if (result && result.command === 'NOTIFY') { + return; + } // Omitting .rows const resultLog = common.pick(result, ['command', 'rowCount', 'duration']); this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog }); @@ -86,6 +104,7 @@ class DatabasePostgres extends Database { _queryFileHelper(_pgp) { return (file) => { const _scope = _fileScope('_queryFile'); + /* istanbul ignore next */ const qfParams = { minify: true, ...(this.noWarnings && { noWarnings: this.noWarnings }), @@ -107,6 +126,9 @@ class DatabasePostgres extends Database { await this._initTables(); } await super.initialize(); + if (this.listener) { + await this.listener.start(); + } } @@ -140,10 +162,16 @@ class DatabasePostgres extends Database { this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted }); for (const v of migrationsWanted) { const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql'); - const migrationSql = _queryFile(fPath); - const results = await this.db.multiResult(migrationSql); - this.logger.debug(_scope, 'executed migration sql', { version: v, results }); - this.logger.info(_scope, 'applied migration', { version: v }); + try { + const migrationSql = _queryFile(fPath); + this.logger.debug(_scope, 'applying migration', { version: v }); + const results = await this.db.multiResult(migrationSql); + this.logger.debug(_scope, 'migration results', { results }); + this.logger.info(_scope, 'applied migration', { version: v }); + } catch (e) { + this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v }); + throw e; + } } } @@ -173,6 +201,9 @@ class DatabasePostgres extends Database { async _closeConnection() { const _scope = _fileScope('_closeConnection'); try { + if (this.listener) { + await this.listener.stop(); + } await this._pgp.end(); } catch (e) { this.logger.error(_scope, 'failed', { error: e }); @@ -181,6 +212,7 @@ class DatabasePostgres extends Database { } + /* istanbul ignore next */ async _purgeTables(really = false) { const _scope = _fileScope('_purgeTables'); try { @@ -219,6 +251,77 @@ class DatabasePostgres extends Database { } + /** + * Receive notices when topic entry is updated. + * Clear relevant cache entry. + * @param {String} payload + */ + _topicChanged(payload) { + const _scope = _fileScope('_topicChanged'); + if (payload !== 'ping') { + this.logger.debug(_scope, 'called', { payload }); + this.cache.delete(payload); + } + } + + + /** + * Called when a listener connection is opened. + * Enable cache. + */ + _listenerEstablished() { + const _scope = _fileScope('_listenerEstablished'); + this.logger.debug(_scope, 'called', {}); + this.cache = new Map(); + } + + + /** + * Called when a listener connection is closed. + * Disable cache. + */ + _listenerLost() { + const _scope = _fileScope('_listenerLost'); + this.logger.debug(_scope, 'called', {}); + delete this.cache; + } + + + /** + * Return a cached entry, if available. + * @param {*} key + */ + _cacheGet(key) { + const _scope = _fileScope('_cacheGet'); + if (this.cache && this.cache.has(key)) { + const cacheEntry = this.cache.get(key); + this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) }); + cacheEntry.hits += 1; + cacheEntry.lastHit = new Date(); + return cacheEntry.data; + } + } + + + /** + * Store an entry in cache, if available. + * @param {*} key + * @param {*} data + */ + _cacheSet(key, data) { + const _scope = _fileScope('_cacheSet'); + if (this.cache) { + this.cache.set(key, { + added: new Date(), + hits: 0, + lastHit: undefined, + data, + }); + this.logger.debug(_scope, 'added cache entry', { key }); + } + } + + async context(fn) { return this.db.task(async (t) => fn(t)); } @@ -324,6 +427,21 @@ class DatabasePostgres extends Database { } + async subscriptionDeleteExpired(dbCtx, topicId) { + const _scope = _fileScope('subscriptionDeleteExpired'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId }); + this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount }); + return this._engineInfo(result); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) { const _scope = _fileScope('subscriptionDeliveryClaim'); this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant }); @@ -361,14 +479,14 @@ class DatabasePostgres extends Database { } - async subscriptionDeliveryComplete(dbCtx, callback, topicId) { + async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) { const _scope = _fileScope('subscriptionDeliveryComplete'); - this.logger.debug(_scope, 'called', { callback, topicId }); + this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated }); let result; try { await dbCtx.txIf(async (txCtx) => { - result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId }); + result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated }); if (result.rowCount != 1) { throw new DBErrors.UnexpectedResult('did not set subscription delivery success'); } @@ -378,7 +496,7 @@ class DatabasePostgres extends Database { } }); } catch (e) { - this.logger.error(_scope, 'failed', { error: e, callback, topicId }); + this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated }); throw e; } } @@ -642,7 +760,7 @@ class DatabasePostgres extends Database { let topics; try { topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll); - } catch (e) { + } catch (e) { this.logger.error(_scope, 'failed', { error: e, topics }); throw e; } @@ -692,8 +810,14 @@ class DatabasePostgres extends Database { let topic; try { + topic = this._cacheGet(topicId); + if (topic) { + return topic; + } topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId }); - return this._topicDefaults(topic); + const topicWithDefaults = this._topicDefaults(topic); + this._cacheSet(topicId, topicWithDefaults); + return topicWithDefaults; } catch (e) { this.logger.error(_scope, 'failed', { error: e, topic, topicId }); throw e; @@ -701,6 +825,49 @@ class DatabasePostgres extends Database { } + async topicPendingDelete(dbCtx, topicId) { + const _scope = _fileScope('topicPendingDelete'); + this.logger.debug(_scope, 'called', { topicId }); + + try { + await dbCtx.txIf(async (txCtx) => { + const topic = await txCtx.one(this.statement.topicGetById, { topicId }); + if (!topic.isDeleted) { + this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId }); + return; + } + + const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url }); + if (subscriberCount) { + this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount }); + return; + } + + const result = await txCtx.result(this.statement.topicDeleteById, { topicId }); + if (result.rowCount !== 1) { + throw new DBErrors.UnexpectedResult('did not delete topic'); + } + }); + this.logger.debug(_scope, 'success', { topicId }); + } catch (e) { + this.logger.error(_scope, 'failed', { error: e, topicId }); + throw e; + } + } + + + async topicPublishHistory(dbCtx, topicId, days) { + const _scope = _fileScope('topicPublishHistory'); + this.logger.debug(_scope, 'called', { topicId, days }); + + const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days }); + const history = Array.from({ length: days }, () => 0); + events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates)); + + return history; + } + + async topicSet(dbCtx, data) { const _scope = _fileScope('topicSet'); this.logger.debug(_scope, 'called', data); @@ -733,6 +900,8 @@ class DatabasePostgres extends Database { const _scope = _fileScope('topicSetContent'); const topicSetContentData = { contentType: null, + httpETag: null, + httpLastModified: null, ...data, }; const logData = { @@ -749,6 +918,14 @@ class DatabasePostgres extends Database { if (result.rowCount != 1) { throw new DBErrors.UnexpectedResult('did not set topic content'); } + result = await dbCtx.result(this.statement.topicSetContentHistory, { + topicId: data.topicId, + contentHash: data.contentHash, + contentSize: data.content.length, + }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not set topic content history'); + } this.logger.debug(_scope, 'success', { ...logData }); return this._engineInfo(result); } catch (e) {