add caching for topic content db calls (Postgres only)
authorJustin Wind <justin.wind+git@gmail.com>
Thu, 5 Aug 2021 22:11:21 +0000 (15:11 -0700)
committerJustin Wind <justin.wind+git@gmail.com>
Fri, 6 Aug 2021 00:17:29 +0000 (17:17 -0700)
Cache topicContentGetById responses, to avoid many large-payload db
calls when updating subscribers.  Currently only enabled for Postgres,
which uses the LISTEN/NOTIFY mechanism on topic updates to invalidate
cache entries, ensuring data consistency.

bin/authUserAdd.js
config/default.js
config/test.js
src/db/postgres/index.js
src/db/postgres/listener.js [new file with mode: 0644]
test/src/db/postgres-listener.js [new file with mode: 0644]
test/src/db/postgres.js

index 4eaff65b9597995bc1dd5bb2ff08b804c22ac62b..1c33be8566d158aa2aaa37d1578b7fb94a180d02 100644 (file)
@@ -44,6 +44,7 @@ async function readPassword(prompt) {
 }
 
 (async () => {
+  await db.initialize();
   const password = await readPassword('password: ');
   const credential = await argon2.hash(password, { type: argon2.argon2id });
   console.log(`\t${identifier}:${credential}`);
index 5c7091fce7ce87befc70a54bc1146856d141f4dd..1905315cec7a1b69eb0e962513923745ff05db64 100644 (file)
@@ -22,6 +22,12 @@ const defaultOptions = {
   db: {
     connectionString: '', // e.g. sqlite://path/to/dbfile.sqlite
     queryLogLevel: undefined, // Set to log queries
+    cacheEnabled: true, // Cache some db responses. (Postgres only)
+    listener: { // Settings for the cache-invalidator connection. (Postgres only)
+      // pingDelayMs: 5000, // Connection keep-alive/health-check.
+      // reconnectDelayMs: 6000, // Wait time before attempting reconnection.
+      // reconnectTimes: 10, // Retries limit.
+    },
   },
 
   // Logging options
index 091b6c0a636d7712d7f567ddf4b65d1de0b7ca3d..3ff1259606585a8d05e19ac7681b45c1d2af0ebe 100644 (file)
@@ -8,5 +8,6 @@ module.exports = {
   },
   db: {
     queryLogLevel: 'debug',
+    cacheEnabled: false,
   },
 };
index d02d98165360f33819e220b195e3af0d18c26cd2..e6b5f5b391b9fbb29992b834b6a6a6ad6dde1bd5 100644 (file)
@@ -10,6 +10,7 @@ const pgp = require('pg-promise')(pgpInitOptions);
 const svh = require('../schema-version-helper');
 const Database = require('../base');
 const DBErrors = require('../errors');
+const Listener = require('./listener');
 const common = require('../../common');
 
 const _fileScope = common.fileScope(__filename);
@@ -43,10 +44,23 @@ class DatabasePostgres extends Database {
     // Suppress QF warnings when running tests
     this.noWarnings = options.db.noWarnings;
 
+    if (options.db.cacheEnabled) {
+      this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
+        channel: 'topic_changed',
+        dataCallback: this._topicChanged.bind(this),
+        connectionEstablishedCallback: this._listenerEstablished.bind(this),
+        connectionLostCallback: this._listenerLost.bind(this),
+      }));
+    }
+
     // Log queries
     const queryLogLevel = options.db.queryLogLevel;
     if (queryLogLevel) {
       pgpInitOptions.query = (event) => {
+        // Quell outgoing pings
+        if (event && event.query && event.query.startsWith('NOTIFY')) {
+          return;
+        }
         this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
       };
     }
@@ -69,6 +83,10 @@ class DatabasePostgres extends Database {
         }
       }
       if (queryLogLevel) {
+        // Quell outgoing pings
+        if (result && result.command === 'NOTIFY') {
+          return;
+        }
         // Omitting .rows
         const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
         this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
@@ -86,6 +104,7 @@ class DatabasePostgres extends Database {
   _queryFileHelper(_pgp) {
     return (file) => {
       const _scope = _fileScope('_queryFile');
+      /* istanbul ignore next */
       const qfParams = {
         minify: true,
         ...(this.noWarnings && { noWarnings: this.noWarnings }),
@@ -107,6 +126,9 @@ class DatabasePostgres extends Database {
       await this._initTables();
     }
     await super.initialize();
+    if (this.listener) {
+      await this.listener.start();
+    }
   }
 
 
@@ -173,6 +195,9 @@ class DatabasePostgres extends Database {
   async _closeConnection() {
     const _scope = _fileScope('_closeConnection');
     try {
+      if (this.listener) {
+        await this.listener.stop();
+      }
       await this._pgp.end();
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e });
@@ -181,6 +206,7 @@ class DatabasePostgres extends Database {
   }
 
   
+  /* istanbul ignore next */
   async _purgeTables(really = false) {
     const _scope = _fileScope('_purgeTables');
     try {
@@ -219,6 +245,77 @@ class DatabasePostgres extends Database {
   }
 
 
+  /**
+   * Receive notices when topic entry is updated.
+   * Clear relevant cache entry.
+   * @param {String} payload
+   */
+  _topicChanged(payload) {
+    const _scope = _fileScope('_topicChanged');
+    if (payload !== 'ping') {
+      this.logger.debug(_scope, 'called', { payload });
+      this.cache.delete(payload);
+    }
+  }
+
+
+  /**
+   * Called when a listener connection is opened.
+   * Enable cache.
+   */
+  _listenerEstablished() {
+    const _scope = _fileScope('_listenerEstablished');
+    this.logger.debug(_scope, 'called', {});
+    this.cache = new Map();
+  }
+
+
+  /**
+   * Called when a listener connection is closed.
+   * Disable cache.
+   */
+  _listenerLost() {
+    const _scope = _fileScope('_listenerLost');
+    this.logger.debug(_scope, 'called', {});
+    delete this.cache;
+  }
+
+
+  /**
+   * Return a cached entry, if available.
+   * @param {*} key
+   */
+  _cacheGet(key) {
+    const _scope = _fileScope('_cacheGet');
+    if (this.cache && this.cache.has(key)) {
+      const cacheEntry = this.cache.get(key);
+      this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
+      cacheEntry.hits += 1;
+      cacheEntry.lastHit = new Date();
+      return cacheEntry.data;
+    }
+  }
+
+
+  /**
+   * Store an entry in cache, if available.
+   * @param {*} key
+   * @param {*} data
+   */
+  _cacheSet(key, data) {
+    const _scope = _fileScope('_cacheSet');
+    if (this.cache) {
+      this.cache.set(key, {
+        added: new Date(),
+        hits: 0,
+        lastHit: undefined,
+        data,
+      });
+      this.logger.debug(_scope, 'added cache entry', { key });
+    }
+  }
+
+
   async context(fn) {
     return this.db.task(async (t) => fn(t));
   }
@@ -692,8 +789,14 @@ class DatabasePostgres extends Database {
 
     let topic;
     try {
+      topic = this._cacheGet(topicId);
+      if (topic) {
+        return topic;
+      }
       topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
-      return this._topicDefaults(topic);
+      const topicWithDefaults = this._topicDefaults(topic);
+      this._cacheSet(topicId, topicWithDefaults);
+      return topicWithDefaults;
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, topic, topicId });
       throw e;
diff --git a/src/db/postgres/listener.js b/src/db/postgres/listener.js
new file mode 100644 (file)
index 0000000..6ad387a
--- /dev/null
@@ -0,0 +1,169 @@
+'use strict';
+
+const common = require('../../common');
+
+const _fileScope = common.fileScope(__filename);
+
+
+const defaultOptions = {
+  channel: 'cache_invalidation',
+  dataCallback: common.nop,
+  connectionLostCallback: common.nop,
+  connectionEstablishedCallback: common.nop,
+  pingDelayMs: 5000,
+  reconnectDelayMs: 6000,
+  reconnectTimes: 10,
+};
+
+/**
+ * Create a robust connection which listens to a notification channel.
+ */
+class PostgresListener {
+  constructor(logger, db, options) {
+    this.logger = logger;
+    this.db = db;
+
+    this.options = Object.assign({}, defaultOptions, options);
+    this.notificationEventName = 'notification';
+
+    this.connection = null;
+    this.nextPingTimeout = undefined;
+
+    this._onConnectionLostBound = this._onConnectionLost.bind(this);
+    this._onNotificationBound = this._onNotification.bind(this);
+  }
+
+
+  /**
+   * Establish the listener connection.
+   */
+  async start() {
+    await this._reconnect(0, 1);
+    this._sendPing();
+  }
+
+
+  /**
+   * Shut down the listener connection.
+   */
+  async stop() {
+    const _scope = _fileScope('stop');
+    if (this.reconnectPending) {
+      this.logger.debug(_scope, 'overriding existing reconnect retry');
+      clearTimeout(this.reconnectPending);
+      delete this.reconnectPending;
+    }
+    if (this.connection) {
+      this.connection.client.removeListener(this.notificationEventName, this.onNotificationBound);
+      this.connection.done();
+      this.connection = null;
+      await this.options.connectionLostCallback();
+    }
+  }
+
+
+  /**
+   * Begin sending connection pings.
+   */
+  _sendPing() {
+    const _scope = _fileScope('_sendPing');
+    this.nextPingTimeout = setTimeout(async () => {
+      try {
+        if (this.connection) {
+          await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' });
+        }
+      } catch (e) {
+        this.logger.error(_scope, 'failed', { error: e });
+      } finally {
+        this._sendPing();
+      }
+    }, this.options.pingDelayMs);
+  }
+
+
+  /**
+   * Notify callback.
+   * @param {Object} data
+   */
+  async _onNotification(data) {
+    const _scope = _fileScope('_onNotification');
+    // Ignore our own messages
+    if (data.payload === 'ping') {
+      return;
+    }
+    this.logger.debug(_scope, 'called', data);
+    await this.options.dataCallback(data.payload);
+  }
+
+
+  /**
+   * Notify callback and attempt to reconnect.
+   * @param {*} error
+   * @param {*} event
+   */
+  async _onConnectionLost(error, event) {
+    const _scope = _fileScope('_onConnectionLost');
+    this.logger.error(_scope, 'listener connection lost', { error, event });
+    this.connection = null;
+    try {
+      event.client.removeListener(this.notificationEventName, this.onNotificationBound);
+    } catch (e) {
+      this.logger.error(_scope, 'failed to remove listener', { error: e });
+      // That's okay, it was probably just gone anyhow.
+    }
+    await this.options.connectionLostCallback();
+    try {
+      await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes);
+    } catch (e) {
+      this.logger.error(_scope, 'failed to reconnect listener', { error: e });
+    }
+  }
+
+
+  /**
+   * Schedule an attempt to establish a connection.
+   * @param {Number} delay
+   * @param {Number} retriesRemaining
+   */
+  async _reconnect(delay, retriesRemaining) {
+    const _scope = _fileScope('_reconnect');
+    if (this.connection) {
+      this.logger.debug(_scope, 'closing existing connection');
+      this.connection.done();
+      this.connection = null;
+    }
+    if (this.reconnectPending) {
+      this.logger.debug(_scope, 'overriding existing reconnect retry');
+      clearTimeout(this.reconnectPending);
+    }
+    return new Promise((resolve, reject) => {
+      this.reconnectPending = setTimeout(async () => {
+        try {
+          delete this.reconnectPending;
+          this.connection = await this.db.connect({
+            direct: true,
+            onLost: this._onConnectionLostBound,
+          });
+          this.connection.client.on(this.notificationEventName, this._onNotificationBound);
+          await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel });
+          this.logger.debug(_scope, 'listener connection established');
+          await this.options.connectionEstablishedCallback();
+          resolve();
+        } catch (e) {
+          if (retriesRemaining <= 0) {
+            return reject(e);
+          }
+          try {
+            await this._reconnect(delay, retriesRemaining - 1);
+            resolve();
+          } catch (e2) {
+            reject(e2);
+          }
+        }
+      }, delay);
+    });
+  }
+
+}
+
+module.exports = PostgresListener;
\ No newline at end of file
diff --git a/test/src/db/postgres-listener.js b/test/src/db/postgres-listener.js
new file mode 100644 (file)
index 0000000..7926746
--- /dev/null
@@ -0,0 +1,197 @@
+/* eslint-env mocha */
+'use strict';
+
+const assert = require('assert');
+const sinon = require('sinon');
+const stubLogger = require('../../stub-logger');
+const Listener = require('../../../src/db/postgres/listener');
+
+const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+const noExpectedException = 'did not get expected exception';
+
+describe('Postgres Listener', function () {
+  let listener, options, connectionStub, pgpStub;
+  beforeEach(function () {
+    connectionStub = {
+      client: {
+        on: sinon.stub(),
+        removeListener: sinon.stub(),
+      },
+      done: sinon.stub(),
+      none: sinon.stub(),
+    };
+    pgpStub = {
+      connect: sinon.stub().resolves(connectionStub),
+    };
+    options = {
+      dataCallback: sinon.stub(),
+      connectionLostCallback: sinon.stub(),
+      connectionEstablishedCallback: sinon.stub(),
+      pingDelayMs: 100,
+      reconnectDelayMs: 1000,
+      reconnectTimes: 1,
+    };
+    listener = new Listener(stubLogger, pgpStub, options);
+  });
+  afterEach(function () {
+    sinon.restore();
+  });
+
+  describe('start', function () {
+    it('covers', async function () {
+      sinon.stub(listener, '_reconnect').resolves();
+      sinon.stub(listener, '_sendPing').resolves();
+      await listener.start();
+      assert(listener._reconnect.called);
+      assert(listener._sendPing.called);
+    });
+  }); // start
+
+  describe('stop', function () {
+    it('covers not started', async function () {
+      await listener.stop();
+    });
+    it('cancels pending reconnect', async function() {
+      const pendingReconnect = sinon.stub();
+      listener.reconnectPending = setTimeout(pendingReconnect, 100);
+      await listener.stop();
+      snooze(110);
+      assert(!pendingReconnect.called);
+    });
+    it('closes existing connection', async function () {
+      listener.connection = connectionStub;
+      await listener.stop();
+      assert(connectionStub.client.removeListener.called);
+      assert.strictEqual(listener.connection, null);
+      assert(options.connectionLostCallback.called);
+    });
+  }); // stop
+
+  describe('_reconnect', function () {
+    it('reconnects', async function () {
+      await listener._reconnect(0, 1);
+      assert(listener.connection);
+      assert(options.connectionEstablishedCallback.called);
+    });
+    it('closes existing connection before reconnecting', async function () {
+      const existingConnection = {
+        done: sinon.stub(),
+      };
+      listener.connection = existingConnection;
+      await listener._reconnect(0, 1);
+      assert(existingConnection.done.called);
+    });
+    it('overrides a pending reconnect', async function () {
+      this.slow(300);
+      const pendingReconnect = sinon.stub();
+      listener.reconnectPending = setTimeout(pendingReconnect, 100);
+      await listener._reconnect(0, 1);
+      await snooze(110);
+      assert(!pendingReconnect.called);
+    });
+    it('fails with no remaining retries', async function () {
+      const expected = new Error('foo');
+      pgpStub.connect = sinon.stub().rejects(expected);
+      try {
+        await listener._reconnect(0, 0);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
+    it('fails all remaining retries', async function () {
+      const expected = new Error('foo');
+      pgpStub.connect = sinon.stub().rejects(expected);
+      try {
+        await listener._reconnect(0, 1);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
+    it('fails first retry', async function () {
+      const expected = new Error('foo');
+      pgpStub.connect = sinon.stub().onCall(0).rejects(expected).resolves(connectionStub);
+      await listener._reconnect(0, 1);
+      assert(options.connectionEstablishedCallback.called);
+    });
+  }); // _reconnect
+
+  describe('_onConnectionLost', function () {
+    let error, event;
+    beforeEach(function () {
+      error = new Error('blah');
+      event = connectionStub;
+      sinon.stub(listener, '_reconnect');
+    });
+    it('success', async function () {
+      await listener._onConnectionLost(error, event);
+      assert.strictEqual(listener.connection, null);
+      assert(event.client.removeListener.called);
+      assert(listener.options.connectionLostCallback.called);
+      assert(listener._reconnect.called);
+    });
+    it('covers reconnect failure', async function () {
+      listener._reconnect.rejects(error);
+      await listener._onConnectionLost(error, event);
+      assert.strictEqual(listener.connection, null);
+      assert(event.client.removeListener.called);
+      assert(listener.options.connectionLostCallback.called);
+      assert(listener._reconnect.called);
+    });
+    it('covers listener removal failure', async function () {
+      event.client.removeListener.throws(error);
+      await listener._onConnectionLost(error, event);
+      assert.strictEqual(listener.connection, null);
+      assert(event.client.removeListener.called);
+      assert(listener.options.connectionLostCallback.called);
+      assert(listener._reconnect.called);
+    });
+  }); // _onConnectionLost
+
+  describe('_onNotification', function () {
+    it('sends data', async function () {
+      const data = {
+        payload: 'foo',
+      };
+      await listener._onNotification(data);
+      assert(listener.options.dataCallback.called);
+    });
+    it('ignores pings', async function () {
+      const data = {
+        payload: 'ping',
+      };
+      await listener._onNotification(data);
+      assert(!listener.options.dataCallback.called);
+    });
+  }); // _onNotification
+
+  describe('_sendPing', function () {
+    it('covers no connection', async function () {
+      this.slow(300);
+      await listener._sendPing();
+      await snooze(110);
+      clearTimeout(listener.nextPingTimeout);
+    });
+    it('success', async function () {
+      this.slow(300);
+      listener.connection = connectionStub;
+      await listener._sendPing();
+      await snooze(110);
+      clearTimeout(listener.nextPingTimeout);
+      assert(connectionStub.none.called);
+    });
+    it('covers error', async function () {
+      const err = new Error('blah');
+      this.slow(300);
+      listener.connection = connectionStub;
+      listener.connection.none.rejects(err);
+      await listener._sendPing();
+      await snooze(110);
+      clearTimeout(listener.nextPingTimeout);
+      assert(listener.connection.none.called);
+
+    });
+  }); // _sendPing
+
+}); // Postgres Listener
index 5aff1e8cceafae1e8faffd3629105cf59a1955ca..ef4790542cd147a076a0fdc11985da6641f10b64 100644 (file)
@@ -70,6 +70,13 @@ describe('DatabasePostgres', function () {
     sinon.restore();
   });
 
+  it('covers listener', function () {
+    const listenerOptions = new Config('test');
+    listenerOptions.db.cacheEnabled = true;
+    const listenerDb = new DB(stubLogger, listenerOptions, pgpStub);
+    assert(listenerDb);
+  });
+
   // Ensure all interface methods are implemented
   describe('Implementation', function () {
     it('implements interface', async function () {
@@ -104,6 +111,11 @@ describe('DatabasePostgres', function () {
         db.pgpInitOptions.query(event);
         assert(db.logger.debug.called);
       });
+      it('covers NOTIFY', function () {
+        const event = { query: 'NOTIFY thing' };
+        db.pgpInitOptions.query(event);
+        assert(!db.logger.debug.called);
+      });
     }); // query
     describe('receive', function () {
       it('covers', function () {
@@ -133,6 +145,35 @@ describe('DatabasePostgres', function () {
         assert(db.logger.debug.called);
         assert.deepStrictEqual(data, expectedData);
       });
+      it('covers NOTIFY', function () {
+        const data = [
+          {
+            column_one: 'one', // eslint-disable-line camelcase
+            column_two: 2, // eslint-disable-line camelcase
+          },
+          {
+            column_one: 'foo', // eslint-disable-line camelcase
+            column_two: 4, // eslint-disable-line camelcase
+          },
+        ];
+        const result = {
+          command: 'NOTIFY',
+        };
+        const event = {};
+        const expectedData = [
+          {
+            columnOne: 'one',
+            columnTwo: 2,
+          },
+          {
+            columnOne: 'foo',
+            columnTwo: 4,
+          },
+        ];
+        db.pgpInitOptions.receive(data, result, event)
+        assert(!db.logger.debug.called);
+        assert.deepStrictEqual(data, expectedData);
+      });
     }); // receive
   }); // pgpInitOptions
 
@@ -156,6 +197,9 @@ describe('DatabasePostgres', function () {
   }); // _initTables
 
   describe('initialize', function () {
+    after(function () {
+      delete db.listener;
+    });
     it('passes supported version', async function () {
       const version = { major: 1, minor: 0, patch: 0 };
       sinon.stub(db.db, 'one').resolves(version);
@@ -188,6 +232,15 @@ describe('DatabasePostgres', function () {
       sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max);
       await db.initialize();
     });
+    it('covers listener', async function() {
+      db.listener = {
+        start: sinon.stub(),
+      };
+      const version = { major: 1, minor: 0, patch: 0 };
+      sinon.stub(db.db, 'one').resolves(version);
+      await db.initialize(false);
+      assert(db.listener.start.called);
+    });
   }); // initialize
 
   describe('healthCheck', function () {
@@ -228,6 +281,9 @@ describe('DatabasePostgres', function () {
   }); // _queryFileHelper
 
   describe('_closeConnection', function () {
+    after(function () {
+      delete db.listener;
+    });
     it('success', async function () {
       sinon.stub(db._pgp, 'end');
       await db._closeConnection();
@@ -243,6 +299,14 @@ describe('DatabasePostgres', function () {
         assert.deepStrictEqual(e, expected);
       }
     });
+    it('covers listener', async function () {
+      db.listener = {
+        stop: sinon.stub(),
+      };
+      sinon.stub(db._pgp, 'end');
+      await db._closeConnection();
+      assert(db._pgp.end.called);
+    });
   }); // _closeConnection
 
   describe('_purgeTables', function () {
@@ -268,6 +332,84 @@ describe('DatabasePostgres', function () {
     });
   }); // _purgeTables
 
+  describe('_topicChanged', function () {
+    beforeEach(function () {
+      db.cache = new Map();
+      sinon.stub(db.cache, 'delete');
+    });
+    after(function () {
+      delete db.cache;
+    });
+    it('covers', function () {
+      db._topicChanged('topic-id');
+      assert(db.cache.delete.called);
+    });
+    it('ignores ping', function () {
+      db._topicChanged('ping');
+      assert(!db.cache.delete.called);
+    });
+  }); // _topicChanged
+
+  describe('_listenerEstablished', function () {
+    it('creates cache', function () {
+      delete db.cache;
+      db._listenerEstablished();
+      assert(db.cache instanceof Map);
+    });
+  }); // _listenerEstablished
+
+  describe('_listenerLost', function () {
+    it('removes cache', function () {
+      db.cache = new Map();
+      db._listenerLost();
+      assert(!db.cache);
+    });
+  }); // _listenerLost
+
+  describe('_cacheGet', function () {
+    let key;
+    beforeEach(function () {
+      key = 'key';
+    });
+    it('nothing if no cache', function () {
+      delete db.cache;
+      const result = db._cacheGet(key);
+      assert.strictEqual(result, undefined);
+    });
+    it('nothing if no entry', function () {
+      db.cache = new Map();
+      const result = db._cacheGet(key);
+      assert.strictEqual(result, undefined);
+    });
+    it('returns cached entry', function () {
+      db.cache = new Map();
+      const expected = {
+        foo: 'bar',
+      };
+      db._cacheSet(key, expected);
+      const result = db._cacheGet(key);
+      assert.deepStrictEqual(result, expected);
+    });
+  }); // _cacheGet
+
+  describe('_cacheSet', function () {
+    let key;
+    beforeEach(function () {
+      key = 'key';
+    });
+    it('covers no cache', function () {
+      delete db.cache;
+      db._cacheSet(key, 'data');
+    });
+    it('covers cache', function () {
+      db.cache = new Map();
+      const expected = 'blah';
+      db._cacheSet(key, expected);
+      const result = db._cacheGet(key);
+      assert.deepStrictEqual(result, expected);
+    });
+  }); // _cacheSet
+
   describe('context', function () {
     it('covers', async function () {
       await db.context(common.nop);
@@ -1024,8 +1166,15 @@ describe('DatabasePostgres', function () {
   }); // topicGetByUrl
 
   describe('topicGetContentById', function () {
+    let topic;
+    beforeEach(function () {
+      delete db.cache;
+      topic = {
+        id: topicId,
+      };
+    });
     it('success', async function() {
-      const expected = { id: topicId };
+      const expected = topic;
       sinon.stub(db.db, 'oneOrNone').resolves(expected);
       const result = await db.topicGetContentById(dbCtx, topicId);
       assert.deepStrictEqual(result, expected);
@@ -1046,6 +1195,23 @@ describe('DatabasePostgres', function () {
         assert.deepStrictEqual(e, expected);
       }
     });
+    it('caches success', async function () {
+      db.cache = new Map();
+      const expected = topic;
+      sinon.stub(db.db, 'oneOrNone').resolves(expected);
+      const result = await db.topicGetContentById(dbCtx, topicId);
+      assert.deepStrictEqual(result, expected);
+    });
+    it('covers cached entry', async function() {
+      let result;
+      db.cache = new Map();
+      const expected = topic;
+      sinon.stub(db.db, 'oneOrNone').resolves(expected);
+      result = await db.topicGetContentById(dbCtx, topicId);
+      assert.deepStrictEqual(result, expected);
+      result = await db.topicGetContentById(dbCtx, topicId);
+      assert.deepStrictEqual(result, expected);
+    });
   }); // topicGetContentById
 
   describe('topicSet', function () {
@@ -1185,7 +1351,7 @@ describe('DatabasePostgres', function () {
       }
     });
 
-  });
+  }); // topicUpdate
 
   describe('verificationClaim', function () {
     it('success', async function() {