From 3fae9b64c5c70ad52376558caa71db99778541b0 Mon Sep 17 00:00:00 2001 From: Justin Wind Date: Thu, 5 Aug 2021 15:11:21 -0700 Subject: [PATCH] add caching for topic content db calls (Postgres only) Cache topicContentGetById responses, to avoid many large-payload db calls when updating subscribers. Currently only enabled for Postgres, which uses the LISTEN/NOTIFY mechanism on topic updates to invalidate cache entries, ensuring data consistency. --- bin/authUserAdd.js | 1 + config/default.js | 6 + config/test.js | 1 + src/db/postgres/index.js | 105 +++++++++++++++- src/db/postgres/listener.js | 169 ++++++++++++++++++++++++++ test/src/db/postgres-listener.js | 197 +++++++++++++++++++++++++++++++ test/src/db/postgres.js | 170 +++++++++++++++++++++++++- 7 files changed, 646 insertions(+), 3 deletions(-) create mode 100644 src/db/postgres/listener.js create mode 100644 test/src/db/postgres-listener.js diff --git a/bin/authUserAdd.js b/bin/authUserAdd.js index 4eaff65..1c33be8 100644 --- a/bin/authUserAdd.js +++ b/bin/authUserAdd.js @@ -44,6 +44,7 @@ async function readPassword(prompt) { } (async () => { + await db.initialize(); const password = await readPassword('password: '); const credential = await argon2.hash(password, { type: argon2.argon2id }); console.log(`\t${identifier}:${credential}`); diff --git a/config/default.js b/config/default.js index 5c7091f..1905315 100644 --- a/config/default.js +++ b/config/default.js @@ -22,6 +22,12 @@ const defaultOptions = { db: { connectionString: '', // e.g. sqlite://path/to/dbfile.sqlite queryLogLevel: undefined, // Set to log queries + cacheEnabled: true, // Cache some db responses. (Postgres only) + listener: { // Settings for the cache-invalidator connection. (Postgres only) + // pingDelayMs: 5000, // Connection keep-alive/health-check. + // reconnectDelayMs: 6000, // Wait time before attempting reconnection. + // reconnectTimes: 10, // Retries limit. + }, }, // Logging options diff --git a/config/test.js b/config/test.js index 091b6c0..3ff1259 100644 --- a/config/test.js +++ b/config/test.js @@ -8,5 +8,6 @@ module.exports = { }, db: { queryLogLevel: 'debug', + cacheEnabled: false, }, }; diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index d02d981..e6b5f5b 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -10,6 +10,7 @@ const pgp = require('pg-promise')(pgpInitOptions); const svh = require('../schema-version-helper'); const Database = require('../base'); const DBErrors = require('../errors'); +const Listener = require('./listener'); const common = require('../../common'); const _fileScope = common.fileScope(__filename); @@ -43,10 +44,23 @@ class DatabasePostgres extends Database { // Suppress QF warnings when running tests this.noWarnings = options.db.noWarnings; + if (options.db.cacheEnabled) { + this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, { + channel: 'topic_changed', + dataCallback: this._topicChanged.bind(this), + connectionEstablishedCallback: this._listenerEstablished.bind(this), + connectionLostCallback: this._listenerLost.bind(this), + })); + } + // Log queries const queryLogLevel = options.db.queryLogLevel; if (queryLogLevel) { pgpInitOptions.query = (event) => { + // Quell outgoing pings + if (event && event.query && event.query.startsWith('NOTIFY')) { + return; + } this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) }); }; } @@ -69,6 +83,10 @@ class DatabasePostgres extends Database { } } if (queryLogLevel) { + // Quell outgoing pings + if (result && result.command === 'NOTIFY') { + return; + } // Omitting .rows const resultLog = common.pick(result, ['command', 'rowCount', 'duration']); this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog }); @@ -86,6 +104,7 @@ class DatabasePostgres extends Database { _queryFileHelper(_pgp) { return (file) => { const _scope = _fileScope('_queryFile'); + /* istanbul ignore next */ const qfParams = { minify: true, ...(this.noWarnings && { noWarnings: this.noWarnings }), @@ -107,6 +126,9 @@ class DatabasePostgres extends Database { await this._initTables(); } await super.initialize(); + if (this.listener) { + await this.listener.start(); + } } @@ -173,6 +195,9 @@ class DatabasePostgres extends Database { async _closeConnection() { const _scope = _fileScope('_closeConnection'); try { + if (this.listener) { + await this.listener.stop(); + } await this._pgp.end(); } catch (e) { this.logger.error(_scope, 'failed', { error: e }); @@ -181,6 +206,7 @@ class DatabasePostgres extends Database { } + /* istanbul ignore next */ async _purgeTables(really = false) { const _scope = _fileScope('_purgeTables'); try { @@ -219,6 +245,77 @@ class DatabasePostgres extends Database { } + /** + * Receive notices when topic entry is updated. + * Clear relevant cache entry. + * @param {String} payload + */ + _topicChanged(payload) { + const _scope = _fileScope('_topicChanged'); + if (payload !== 'ping') { + this.logger.debug(_scope, 'called', { payload }); + this.cache.delete(payload); + } + } + + + /** + * Called when a listener connection is opened. + * Enable cache. + */ + _listenerEstablished() { + const _scope = _fileScope('_listenerEstablished'); + this.logger.debug(_scope, 'called', {}); + this.cache = new Map(); + } + + + /** + * Called when a listener connection is closed. + * Disable cache. + */ + _listenerLost() { + const _scope = _fileScope('_listenerLost'); + this.logger.debug(_scope, 'called', {}); + delete this.cache; + } + + + /** + * Return a cached entry, if available. + * @param {*} key + */ + _cacheGet(key) { + const _scope = _fileScope('_cacheGet'); + if (this.cache && this.cache.has(key)) { + const cacheEntry = this.cache.get(key); + this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) }); + cacheEntry.hits += 1; + cacheEntry.lastHit = new Date(); + return cacheEntry.data; + } + } + + + /** + * Store an entry in cache, if available. + * @param {*} key + * @param {*} data + */ + _cacheSet(key, data) { + const _scope = _fileScope('_cacheSet'); + if (this.cache) { + this.cache.set(key, { + added: new Date(), + hits: 0, + lastHit: undefined, + data, + }); + this.logger.debug(_scope, 'added cache entry', { key }); + } + } + + async context(fn) { return this.db.task(async (t) => fn(t)); } @@ -692,8 +789,14 @@ class DatabasePostgres extends Database { let topic; try { + topic = this._cacheGet(topicId); + if (topic) { + return topic; + } topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId }); - return this._topicDefaults(topic); + const topicWithDefaults = this._topicDefaults(topic); + this._cacheSet(topicId, topicWithDefaults); + return topicWithDefaults; } catch (e) { this.logger.error(_scope, 'failed', { error: e, topic, topicId }); throw e; diff --git a/src/db/postgres/listener.js b/src/db/postgres/listener.js new file mode 100644 index 0000000..6ad387a --- /dev/null +++ b/src/db/postgres/listener.js @@ -0,0 +1,169 @@ +'use strict'; + +const common = require('../../common'); + +const _fileScope = common.fileScope(__filename); + + +const defaultOptions = { + channel: 'cache_invalidation', + dataCallback: common.nop, + connectionLostCallback: common.nop, + connectionEstablishedCallback: common.nop, + pingDelayMs: 5000, + reconnectDelayMs: 6000, + reconnectTimes: 10, +}; + +/** + * Create a robust connection which listens to a notification channel. + */ +class PostgresListener { + constructor(logger, db, options) { + this.logger = logger; + this.db = db; + + this.options = Object.assign({}, defaultOptions, options); + this.notificationEventName = 'notification'; + + this.connection = null; + this.nextPingTimeout = undefined; + + this._onConnectionLostBound = this._onConnectionLost.bind(this); + this._onNotificationBound = this._onNotification.bind(this); + } + + + /** + * Establish the listener connection. + */ + async start() { + await this._reconnect(0, 1); + this._sendPing(); + } + + + /** + * Shut down the listener connection. + */ + async stop() { + const _scope = _fileScope('stop'); + if (this.reconnectPending) { + this.logger.debug(_scope, 'overriding existing reconnect retry'); + clearTimeout(this.reconnectPending); + delete this.reconnectPending; + } + if (this.connection) { + this.connection.client.removeListener(this.notificationEventName, this.onNotificationBound); + this.connection.done(); + this.connection = null; + await this.options.connectionLostCallback(); + } + } + + + /** + * Begin sending connection pings. + */ + _sendPing() { + const _scope = _fileScope('_sendPing'); + this.nextPingTimeout = setTimeout(async () => { + try { + if (this.connection) { + await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' }); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + } finally { + this._sendPing(); + } + }, this.options.pingDelayMs); + } + + + /** + * Notify callback. + * @param {Object} data + */ + async _onNotification(data) { + const _scope = _fileScope('_onNotification'); + // Ignore our own messages + if (data.payload === 'ping') { + return; + } + this.logger.debug(_scope, 'called', data); + await this.options.dataCallback(data.payload); + } + + + /** + * Notify callback and attempt to reconnect. + * @param {*} error + * @param {*} event + */ + async _onConnectionLost(error, event) { + const _scope = _fileScope('_onConnectionLost'); + this.logger.error(_scope, 'listener connection lost', { error, event }); + this.connection = null; + try { + event.client.removeListener(this.notificationEventName, this.onNotificationBound); + } catch (e) { + this.logger.error(_scope, 'failed to remove listener', { error: e }); + // That's okay, it was probably just gone anyhow. + } + await this.options.connectionLostCallback(); + try { + await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes); + } catch (e) { + this.logger.error(_scope, 'failed to reconnect listener', { error: e }); + } + } + + + /** + * Schedule an attempt to establish a connection. + * @param {Number} delay + * @param {Number} retriesRemaining + */ + async _reconnect(delay, retriesRemaining) { + const _scope = _fileScope('_reconnect'); + if (this.connection) { + this.logger.debug(_scope, 'closing existing connection'); + this.connection.done(); + this.connection = null; + } + if (this.reconnectPending) { + this.logger.debug(_scope, 'overriding existing reconnect retry'); + clearTimeout(this.reconnectPending); + } + return new Promise((resolve, reject) => { + this.reconnectPending = setTimeout(async () => { + try { + delete this.reconnectPending; + this.connection = await this.db.connect({ + direct: true, + onLost: this._onConnectionLostBound, + }); + this.connection.client.on(this.notificationEventName, this._onNotificationBound); + await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel }); + this.logger.debug(_scope, 'listener connection established'); + await this.options.connectionEstablishedCallback(); + resolve(); + } catch (e) { + if (retriesRemaining <= 0) { + return reject(e); + } + try { + await this._reconnect(delay, retriesRemaining - 1); + resolve(); + } catch (e2) { + reject(e2); + } + } + }, delay); + }); + } + +} + +module.exports = PostgresListener; \ No newline at end of file diff --git a/test/src/db/postgres-listener.js b/test/src/db/postgres-listener.js new file mode 100644 index 0000000..7926746 --- /dev/null +++ b/test/src/db/postgres-listener.js @@ -0,0 +1,197 @@ +/* eslint-env mocha */ +'use strict'; + +const assert = require('assert'); +const sinon = require('sinon'); +const stubLogger = require('../../stub-logger'); +const Listener = require('../../../src/db/postgres/listener'); + +const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms)); +const noExpectedException = 'did not get expected exception'; + +describe('Postgres Listener', function () { + let listener, options, connectionStub, pgpStub; + beforeEach(function () { + connectionStub = { + client: { + on: sinon.stub(), + removeListener: sinon.stub(), + }, + done: sinon.stub(), + none: sinon.stub(), + }; + pgpStub = { + connect: sinon.stub().resolves(connectionStub), + }; + options = { + dataCallback: sinon.stub(), + connectionLostCallback: sinon.stub(), + connectionEstablishedCallback: sinon.stub(), + pingDelayMs: 100, + reconnectDelayMs: 1000, + reconnectTimes: 1, + }; + listener = new Listener(stubLogger, pgpStub, options); + }); + afterEach(function () { + sinon.restore(); + }); + + describe('start', function () { + it('covers', async function () { + sinon.stub(listener, '_reconnect').resolves(); + sinon.stub(listener, '_sendPing').resolves(); + await listener.start(); + assert(listener._reconnect.called); + assert(listener._sendPing.called); + }); + }); // start + + describe('stop', function () { + it('covers not started', async function () { + await listener.stop(); + }); + it('cancels pending reconnect', async function() { + const pendingReconnect = sinon.stub(); + listener.reconnectPending = setTimeout(pendingReconnect, 100); + await listener.stop(); + snooze(110); + assert(!pendingReconnect.called); + }); + it('closes existing connection', async function () { + listener.connection = connectionStub; + await listener.stop(); + assert(connectionStub.client.removeListener.called); + assert.strictEqual(listener.connection, null); + assert(options.connectionLostCallback.called); + }); + }); // stop + + describe('_reconnect', function () { + it('reconnects', async function () { + await listener._reconnect(0, 1); + assert(listener.connection); + assert(options.connectionEstablishedCallback.called); + }); + it('closes existing connection before reconnecting', async function () { + const existingConnection = { + done: sinon.stub(), + }; + listener.connection = existingConnection; + await listener._reconnect(0, 1); + assert(existingConnection.done.called); + }); + it('overrides a pending reconnect', async function () { + this.slow(300); + const pendingReconnect = sinon.stub(); + listener.reconnectPending = setTimeout(pendingReconnect, 100); + await listener._reconnect(0, 1); + await snooze(110); + assert(!pendingReconnect.called); + }); + it('fails with no remaining retries', async function () { + const expected = new Error('foo'); + pgpStub.connect = sinon.stub().rejects(expected); + try { + await listener._reconnect(0, 0); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + it('fails all remaining retries', async function () { + const expected = new Error('foo'); + pgpStub.connect = sinon.stub().rejects(expected); + try { + await listener._reconnect(0, 1); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + it('fails first retry', async function () { + const expected = new Error('foo'); + pgpStub.connect = sinon.stub().onCall(0).rejects(expected).resolves(connectionStub); + await listener._reconnect(0, 1); + assert(options.connectionEstablishedCallback.called); + }); + }); // _reconnect + + describe('_onConnectionLost', function () { + let error, event; + beforeEach(function () { + error = new Error('blah'); + event = connectionStub; + sinon.stub(listener, '_reconnect'); + }); + it('success', async function () { + await listener._onConnectionLost(error, event); + assert.strictEqual(listener.connection, null); + assert(event.client.removeListener.called); + assert(listener.options.connectionLostCallback.called); + assert(listener._reconnect.called); + }); + it('covers reconnect failure', async function () { + listener._reconnect.rejects(error); + await listener._onConnectionLost(error, event); + assert.strictEqual(listener.connection, null); + assert(event.client.removeListener.called); + assert(listener.options.connectionLostCallback.called); + assert(listener._reconnect.called); + }); + it('covers listener removal failure', async function () { + event.client.removeListener.throws(error); + await listener._onConnectionLost(error, event); + assert.strictEqual(listener.connection, null); + assert(event.client.removeListener.called); + assert(listener.options.connectionLostCallback.called); + assert(listener._reconnect.called); + }); + }); // _onConnectionLost + + describe('_onNotification', function () { + it('sends data', async function () { + const data = { + payload: 'foo', + }; + await listener._onNotification(data); + assert(listener.options.dataCallback.called); + }); + it('ignores pings', async function () { + const data = { + payload: 'ping', + }; + await listener._onNotification(data); + assert(!listener.options.dataCallback.called); + }); + }); // _onNotification + + describe('_sendPing', function () { + it('covers no connection', async function () { + this.slow(300); + await listener._sendPing(); + await snooze(110); + clearTimeout(listener.nextPingTimeout); + }); + it('success', async function () { + this.slow(300); + listener.connection = connectionStub; + await listener._sendPing(); + await snooze(110); + clearTimeout(listener.nextPingTimeout); + assert(connectionStub.none.called); + }); + it('covers error', async function () { + const err = new Error('blah'); + this.slow(300); + listener.connection = connectionStub; + listener.connection.none.rejects(err); + await listener._sendPing(); + await snooze(110); + clearTimeout(listener.nextPingTimeout); + assert(listener.connection.none.called); + + }); + }); // _sendPing + +}); // Postgres Listener diff --git a/test/src/db/postgres.js b/test/src/db/postgres.js index 5aff1e8..ef47905 100644 --- a/test/src/db/postgres.js +++ b/test/src/db/postgres.js @@ -70,6 +70,13 @@ describe('DatabasePostgres', function () { sinon.restore(); }); + it('covers listener', function () { + const listenerOptions = new Config('test'); + listenerOptions.db.cacheEnabled = true; + const listenerDb = new DB(stubLogger, listenerOptions, pgpStub); + assert(listenerDb); + }); + // Ensure all interface methods are implemented describe('Implementation', function () { it('implements interface', async function () { @@ -104,6 +111,11 @@ describe('DatabasePostgres', function () { db.pgpInitOptions.query(event); assert(db.logger.debug.called); }); + it('covers NOTIFY', function () { + const event = { query: 'NOTIFY thing' }; + db.pgpInitOptions.query(event); + assert(!db.logger.debug.called); + }); }); // query describe('receive', function () { it('covers', function () { @@ -133,6 +145,35 @@ describe('DatabasePostgres', function () { assert(db.logger.debug.called); assert.deepStrictEqual(data, expectedData); }); + it('covers NOTIFY', function () { + const data = [ + { + column_one: 'one', // eslint-disable-line camelcase + column_two: 2, // eslint-disable-line camelcase + }, + { + column_one: 'foo', // eslint-disable-line camelcase + column_two: 4, // eslint-disable-line camelcase + }, + ]; + const result = { + command: 'NOTIFY', + }; + const event = {}; + const expectedData = [ + { + columnOne: 'one', + columnTwo: 2, + }, + { + columnOne: 'foo', + columnTwo: 4, + }, + ]; + db.pgpInitOptions.receive(data, result, event) + assert(!db.logger.debug.called); + assert.deepStrictEqual(data, expectedData); + }); }); // receive }); // pgpInitOptions @@ -156,6 +197,9 @@ describe('DatabasePostgres', function () { }); // _initTables describe('initialize', function () { + after(function () { + delete db.listener; + }); it('passes supported version', async function () { const version = { major: 1, minor: 0, patch: 0 }; sinon.stub(db.db, 'one').resolves(version); @@ -188,6 +232,15 @@ describe('DatabasePostgres', function () { sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max); await db.initialize(); }); + it('covers listener', async function() { + db.listener = { + start: sinon.stub(), + }; + const version = { major: 1, minor: 0, patch: 0 }; + sinon.stub(db.db, 'one').resolves(version); + await db.initialize(false); + assert(db.listener.start.called); + }); }); // initialize describe('healthCheck', function () { @@ -228,6 +281,9 @@ describe('DatabasePostgres', function () { }); // _queryFileHelper describe('_closeConnection', function () { + after(function () { + delete db.listener; + }); it('success', async function () { sinon.stub(db._pgp, 'end'); await db._closeConnection(); @@ -243,6 +299,14 @@ describe('DatabasePostgres', function () { assert.deepStrictEqual(e, expected); } }); + it('covers listener', async function () { + db.listener = { + stop: sinon.stub(), + }; + sinon.stub(db._pgp, 'end'); + await db._closeConnection(); + assert(db._pgp.end.called); + }); }); // _closeConnection describe('_purgeTables', function () { @@ -268,6 +332,84 @@ describe('DatabasePostgres', function () { }); }); // _purgeTables + describe('_topicChanged', function () { + beforeEach(function () { + db.cache = new Map(); + sinon.stub(db.cache, 'delete'); + }); + after(function () { + delete db.cache; + }); + it('covers', function () { + db._topicChanged('topic-id'); + assert(db.cache.delete.called); + }); + it('ignores ping', function () { + db._topicChanged('ping'); + assert(!db.cache.delete.called); + }); + }); // _topicChanged + + describe('_listenerEstablished', function () { + it('creates cache', function () { + delete db.cache; + db._listenerEstablished(); + assert(db.cache instanceof Map); + }); + }); // _listenerEstablished + + describe('_listenerLost', function () { + it('removes cache', function () { + db.cache = new Map(); + db._listenerLost(); + assert(!db.cache); + }); + }); // _listenerLost + + describe('_cacheGet', function () { + let key; + beforeEach(function () { + key = 'key'; + }); + it('nothing if no cache', function () { + delete db.cache; + const result = db._cacheGet(key); + assert.strictEqual(result, undefined); + }); + it('nothing if no entry', function () { + db.cache = new Map(); + const result = db._cacheGet(key); + assert.strictEqual(result, undefined); + }); + it('returns cached entry', function () { + db.cache = new Map(); + const expected = { + foo: 'bar', + }; + db._cacheSet(key, expected); + const result = db._cacheGet(key); + assert.deepStrictEqual(result, expected); + }); + }); // _cacheGet + + describe('_cacheSet', function () { + let key; + beforeEach(function () { + key = 'key'; + }); + it('covers no cache', function () { + delete db.cache; + db._cacheSet(key, 'data'); + }); + it('covers cache', function () { + db.cache = new Map(); + const expected = 'blah'; + db._cacheSet(key, expected); + const result = db._cacheGet(key); + assert.deepStrictEqual(result, expected); + }); + }); // _cacheSet + describe('context', function () { it('covers', async function () { await db.context(common.nop); @@ -1024,8 +1166,15 @@ describe('DatabasePostgres', function () { }); // topicGetByUrl describe('topicGetContentById', function () { + let topic; + beforeEach(function () { + delete db.cache; + topic = { + id: topicId, + }; + }); it('success', async function() { - const expected = { id: topicId }; + const expected = topic; sinon.stub(db.db, 'oneOrNone').resolves(expected); const result = await db.topicGetContentById(dbCtx, topicId); assert.deepStrictEqual(result, expected); @@ -1046,6 +1195,23 @@ describe('DatabasePostgres', function () { assert.deepStrictEqual(e, expected); } }); + it('caches success', async function () { + db.cache = new Map(); + const expected = topic; + sinon.stub(db.db, 'oneOrNone').resolves(expected); + const result = await db.topicGetContentById(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + }); + it('covers cached entry', async function() { + let result; + db.cache = new Map(); + const expected = topic; + sinon.stub(db.db, 'oneOrNone').resolves(expected); + result = await db.topicGetContentById(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + result = await db.topicGetContentById(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + }); }); // topicGetContentById describe('topicSet', function () { @@ -1185,7 +1351,7 @@ describe('DatabasePostgres', function () { } }); - }); + }); // topicUpdate describe('verificationClaim', function () { it('success', async function() { -- 2.43.2