const svh = require('../schema-version-helper');
const Database = require('../base');
const DBErrors = require('../errors');
+const Listener = require('./listener');
const common = require('../../common');
const _fileScope = common.fileScope(__filename);
// Suppress QF warnings when running tests
this.noWarnings = options.db.noWarnings;
+ if (options.db.cacheEnabled) {
+ this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
+ channel: 'topic_changed',
+ dataCallback: this._topicChanged.bind(this),
+ connectionEstablishedCallback: this._listenerEstablished.bind(this),
+ connectionLostCallback: this._listenerLost.bind(this),
+ }));
+ }
+
// Log queries
const queryLogLevel = options.db.queryLogLevel;
if (queryLogLevel) {
pgpInitOptions.query = (event) => {
+ // Quell outgoing pings
+ if (event && event.query && event.query.startsWith('NOTIFY')) {
+ return;
+ }
this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
};
}
}
}
if (queryLogLevel) {
+ // Quell outgoing pings
+ if (result && result.command === 'NOTIFY') {
+ return;
+ }
// Omitting .rows
const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
_queryFileHelper(_pgp) {
return (file) => {
const _scope = _fileScope('_queryFile');
+ /* istanbul ignore next */
const qfParams = {
minify: true,
...(this.noWarnings && { noWarnings: this.noWarnings }),
}
- async schemaCheck(applyMigrations = true) {
- const _scope = _fileScope('schemaCheck');
+ async initialize(applyMigrations = true) {
+ const _scope = _fileScope('initialize');
this.logger.debug(_scope, 'called', { applyMigrations });
if (applyMigrations) {
await this._initTables();
}
- await super.schemaCheck();
+ await super.initialize();
+ if (this.listener) {
+ await this.listener.start();
+ }
}
async _closeConnection() {
const _scope = _fileScope('_closeConnection');
try {
+ if (this.listener) {
+ await this.listener.stop();
+ }
await this._pgp.end();
} catch (e) {
this.logger.error(_scope, 'failed', { error: e });
}
+ /* istanbul ignore next */
async _purgeTables(really = false) {
const _scope = _fileScope('_purgeTables');
try {
}
+ /**
+ * Receive notices when topic entry is updated.
+ * Clear relevant cache entry.
+ * @param {String} payload
+ */
+ _topicChanged(payload) {
+ const _scope = _fileScope('_topicChanged');
+ if (payload !== 'ping') {
+ this.logger.debug(_scope, 'called', { payload });
+ this.cache.delete(payload);
+ }
+ }
+
+
+ /**
+ * Called when a listener connection is opened.
+ * Enable cache.
+ */
+ _listenerEstablished() {
+ const _scope = _fileScope('_listenerEstablished');
+ this.logger.debug(_scope, 'called', {});
+ this.cache = new Map();
+ }
+
+
+ /**
+ * Called when a listener connection is closed.
+ * Disable cache.
+ */
+ _listenerLost() {
+ const _scope = _fileScope('_listenerLost');
+ this.logger.debug(_scope, 'called', {});
+ delete this.cache;
+ }
+
+
+ /**
+ * Return a cached entry, if available.
+ * @param {*} key
+ */
+ _cacheGet(key) {
+ const _scope = _fileScope('_cacheGet');
+ if (this.cache && this.cache.has(key)) {
+ const cacheEntry = this.cache.get(key);
+ this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
+ cacheEntry.hits += 1;
+ cacheEntry.lastHit = new Date();
+ return cacheEntry.data;
+ }
+ }
+
+
+ /**
+ * Store an entry in cache, if available.
+ * @param {*} key
+ * @param {*} data
+ */
+ _cacheSet(key, data) {
+ const _scope = _fileScope('_cacheSet');
+ if (this.cache) {
+ this.cache.set(key, {
+ added: new Date(),
+ hits: 0,
+ lastHit: undefined,
+ data,
+ });
+ this.logger.debug(_scope, 'added cache entry', { key });
+ }
+ }
+
+
async context(fn) {
return this.db.task(async (t) => fn(t));
}
let topics;
try {
topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
- } catch (e) {
+ } catch (e) {
this.logger.error(_scope, 'failed', { error: e, topics });
throw e;
}
let topic;
try {
+ topic = this._cacheGet(topicId);
+ if (topic) {
+ return topic;
+ }
topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
- return this._topicDefaults(topic);
+ const topicWithDefaults = this._topicDefaults(topic);
+ this._cacheSet(topicId, topicWithDefaults);
+ return topicWithDefaults;
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, topic, topicId });
throw e;