}
(async () => {
+ await db.initialize();
const password = await readPassword('password: ');
const credential = await argon2.hash(password, { type: argon2.argon2id });
console.log(`\t${identifier}:${credential}`);
db: {
connectionString: '', // e.g. sqlite://path/to/dbfile.sqlite
queryLogLevel: undefined, // Set to log queries
+ cacheEnabled: true, // Cache some db responses. (Postgres only)
+ listener: { // Settings for the cache-invalidator connection. (Postgres only)
+ // pingDelayMs: 5000, // Connection keep-alive/health-check.
+ // reconnectDelayMs: 6000, // Wait time before attempting reconnection.
+ // reconnectTimes: 10, // Retries limit.
+ },
},
// Logging options
},
db: {
queryLogLevel: 'debug',
+ cacheEnabled: false,
},
};
const svh = require('../schema-version-helper');
const Database = require('../base');
const DBErrors = require('../errors');
+const Listener = require('./listener');
const common = require('../../common');
const _fileScope = common.fileScope(__filename);
// Suppress QF warnings when running tests
this.noWarnings = options.db.noWarnings;
+ if (options.db.cacheEnabled) {
+ this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
+ channel: 'topic_changed',
+ dataCallback: this._topicChanged.bind(this),
+ connectionEstablishedCallback: this._listenerEstablished.bind(this),
+ connectionLostCallback: this._listenerLost.bind(this),
+ }));
+ }
+
// Log queries
const queryLogLevel = options.db.queryLogLevel;
if (queryLogLevel) {
pgpInitOptions.query = (event) => {
+ // Quell outgoing pings
+ if (event && event.query && event.query.startsWith('NOTIFY')) {
+ return;
+ }
this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
};
}
}
}
if (queryLogLevel) {
+ // Quell outgoing pings
+ if (result && result.command === 'NOTIFY') {
+ return;
+ }
// Omitting .rows
const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
_queryFileHelper(_pgp) {
return (file) => {
const _scope = _fileScope('_queryFile');
+ /* istanbul ignore next */
const qfParams = {
minify: true,
...(this.noWarnings && { noWarnings: this.noWarnings }),
await this._initTables();
}
await super.initialize();
+ if (this.listener) {
+ await this.listener.start();
+ }
}
async _closeConnection() {
const _scope = _fileScope('_closeConnection');
try {
+ if (this.listener) {
+ await this.listener.stop();
+ }
await this._pgp.end();
} catch (e) {
this.logger.error(_scope, 'failed', { error: e });
}
+ /* istanbul ignore next */
async _purgeTables(really = false) {
const _scope = _fileScope('_purgeTables');
try {
}
+ /**
+ * Receive notices when topic entry is updated.
+ * Clear relevant cache entry.
+ * @param {String} payload
+ */
+ _topicChanged(payload) {
+ const _scope = _fileScope('_topicChanged');
+ if (payload !== 'ping') {
+ this.logger.debug(_scope, 'called', { payload });
+ this.cache.delete(payload);
+ }
+ }
+
+
+ /**
+ * Called when a listener connection is opened.
+ * Enable cache.
+ */
+ _listenerEstablished() {
+ const _scope = _fileScope('_listenerEstablished');
+ this.logger.debug(_scope, 'called', {});
+ this.cache = new Map();
+ }
+
+
+ /**
+ * Called when a listener connection is closed.
+ * Disable cache.
+ */
+ _listenerLost() {
+ const _scope = _fileScope('_listenerLost');
+ this.logger.debug(_scope, 'called', {});
+ delete this.cache;
+ }
+
+
+ /**
+ * Return a cached entry, if available.
+ * @param {*} key
+ */
+ _cacheGet(key) {
+ const _scope = _fileScope('_cacheGet');
+ if (this.cache && this.cache.has(key)) {
+ const cacheEntry = this.cache.get(key);
+ this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
+ cacheEntry.hits += 1;
+ cacheEntry.lastHit = new Date();
+ return cacheEntry.data;
+ }
+ }
+
+
+ /**
+ * Store an entry in cache, if available.
+ * @param {*} key
+ * @param {*} data
+ */
+ _cacheSet(key, data) {
+ const _scope = _fileScope('_cacheSet');
+ if (this.cache) {
+ this.cache.set(key, {
+ added: new Date(),
+ hits: 0,
+ lastHit: undefined,
+ data,
+ });
+ this.logger.debug(_scope, 'added cache entry', { key });
+ }
+ }
+
+
async context(fn) {
return this.db.task(async (t) => fn(t));
}
let topic;
try {
+ topic = this._cacheGet(topicId);
+ if (topic) {
+ return topic;
+ }
topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
- return this._topicDefaults(topic);
+ const topicWithDefaults = this._topicDefaults(topic);
+ this._cacheSet(topicId, topicWithDefaults);
+ return topicWithDefaults;
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, topic, topicId });
throw e;
--- /dev/null
+'use strict';
+
+const common = require('../../common');
+
+const _fileScope = common.fileScope(__filename);
+
+
+const defaultOptions = {
+ channel: 'cache_invalidation',
+ dataCallback: common.nop,
+ connectionLostCallback: common.nop,
+ connectionEstablishedCallback: common.nop,
+ pingDelayMs: 5000,
+ reconnectDelayMs: 6000,
+ reconnectTimes: 10,
+};
+
+/**
+ * Create a robust connection which listens to a notification channel.
+ */
+class PostgresListener {
+ constructor(logger, db, options) {
+ this.logger = logger;
+ this.db = db;
+
+ this.options = Object.assign({}, defaultOptions, options);
+ this.notificationEventName = 'notification';
+
+ this.connection = null;
+ this.nextPingTimeout = undefined;
+
+ this._onConnectionLostBound = this._onConnectionLost.bind(this);
+ this._onNotificationBound = this._onNotification.bind(this);
+ }
+
+
+ /**
+ * Establish the listener connection.
+ */
+ async start() {
+ await this._reconnect(0, 1);
+ this._sendPing();
+ }
+
+
+ /**
+ * Shut down the listener connection.
+ */
+ async stop() {
+ const _scope = _fileScope('stop');
+ if (this.reconnectPending) {
+ this.logger.debug(_scope, 'overriding existing reconnect retry');
+ clearTimeout(this.reconnectPending);
+ delete this.reconnectPending;
+ }
+ if (this.connection) {
+ this.connection.client.removeListener(this.notificationEventName, this.onNotificationBound);
+ this.connection.done();
+ this.connection = null;
+ await this.options.connectionLostCallback();
+ }
+ }
+
+
+ /**
+ * Begin sending connection pings.
+ */
+ _sendPing() {
+ const _scope = _fileScope('_sendPing');
+ this.nextPingTimeout = setTimeout(async () => {
+ try {
+ if (this.connection) {
+ await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' });
+ }
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ } finally {
+ this._sendPing();
+ }
+ }, this.options.pingDelayMs);
+ }
+
+
+ /**
+ * Notify callback.
+ * @param {Object} data
+ */
+ async _onNotification(data) {
+ const _scope = _fileScope('_onNotification');
+ // Ignore our own messages
+ if (data.payload === 'ping') {
+ return;
+ }
+ this.logger.debug(_scope, 'called', data);
+ await this.options.dataCallback(data.payload);
+ }
+
+
+ /**
+ * Notify callback and attempt to reconnect.
+ * @param {*} error
+ * @param {*} event
+ */
+ async _onConnectionLost(error, event) {
+ const _scope = _fileScope('_onConnectionLost');
+ this.logger.error(_scope, 'listener connection lost', { error, event });
+ this.connection = null;
+ try {
+ event.client.removeListener(this.notificationEventName, this.onNotificationBound);
+ } catch (e) {
+ this.logger.error(_scope, 'failed to remove listener', { error: e });
+ // That's okay, it was probably just gone anyhow.
+ }
+ await this.options.connectionLostCallback();
+ try {
+ await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes);
+ } catch (e) {
+ this.logger.error(_scope, 'failed to reconnect listener', { error: e });
+ }
+ }
+
+
+ /**
+ * Schedule an attempt to establish a connection.
+ * @param {Number} delay
+ * @param {Number} retriesRemaining
+ */
+ async _reconnect(delay, retriesRemaining) {
+ const _scope = _fileScope('_reconnect');
+ if (this.connection) {
+ this.logger.debug(_scope, 'closing existing connection');
+ this.connection.done();
+ this.connection = null;
+ }
+ if (this.reconnectPending) {
+ this.logger.debug(_scope, 'overriding existing reconnect retry');
+ clearTimeout(this.reconnectPending);
+ }
+ return new Promise((resolve, reject) => {
+ this.reconnectPending = setTimeout(async () => {
+ try {
+ delete this.reconnectPending;
+ this.connection = await this.db.connect({
+ direct: true,
+ onLost: this._onConnectionLostBound,
+ });
+ this.connection.client.on(this.notificationEventName, this._onNotificationBound);
+ await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel });
+ this.logger.debug(_scope, 'listener connection established');
+ await this.options.connectionEstablishedCallback();
+ resolve();
+ } catch (e) {
+ if (retriesRemaining <= 0) {
+ return reject(e);
+ }
+ try {
+ await this._reconnect(delay, retriesRemaining - 1);
+ resolve();
+ } catch (e2) {
+ reject(e2);
+ }
+ }
+ }, delay);
+ });
+ }
+
+}
+
+module.exports = PostgresListener;
\ No newline at end of file
--- /dev/null
+/* eslint-env mocha */
+'use strict';
+
+const assert = require('assert');
+const sinon = require('sinon');
+const stubLogger = require('../../stub-logger');
+const Listener = require('../../../src/db/postgres/listener');
+
+const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+const noExpectedException = 'did not get expected exception';
+
+describe('Postgres Listener', function () {
+ let listener, options, connectionStub, pgpStub;
+ beforeEach(function () {
+ connectionStub = {
+ client: {
+ on: sinon.stub(),
+ removeListener: sinon.stub(),
+ },
+ done: sinon.stub(),
+ none: sinon.stub(),
+ };
+ pgpStub = {
+ connect: sinon.stub().resolves(connectionStub),
+ };
+ options = {
+ dataCallback: sinon.stub(),
+ connectionLostCallback: sinon.stub(),
+ connectionEstablishedCallback: sinon.stub(),
+ pingDelayMs: 100,
+ reconnectDelayMs: 1000,
+ reconnectTimes: 1,
+ };
+ listener = new Listener(stubLogger, pgpStub, options);
+ });
+ afterEach(function () {
+ sinon.restore();
+ });
+
+ describe('start', function () {
+ it('covers', async function () {
+ sinon.stub(listener, '_reconnect').resolves();
+ sinon.stub(listener, '_sendPing').resolves();
+ await listener.start();
+ assert(listener._reconnect.called);
+ assert(listener._sendPing.called);
+ });
+ }); // start
+
+ describe('stop', function () {
+ it('covers not started', async function () {
+ await listener.stop();
+ });
+ it('cancels pending reconnect', async function() {
+ const pendingReconnect = sinon.stub();
+ listener.reconnectPending = setTimeout(pendingReconnect, 100);
+ await listener.stop();
+ snooze(110);
+ assert(!pendingReconnect.called);
+ });
+ it('closes existing connection', async function () {
+ listener.connection = connectionStub;
+ await listener.stop();
+ assert(connectionStub.client.removeListener.called);
+ assert.strictEqual(listener.connection, null);
+ assert(options.connectionLostCallback.called);
+ });
+ }); // stop
+
+ describe('_reconnect', function () {
+ it('reconnects', async function () {
+ await listener._reconnect(0, 1);
+ assert(listener.connection);
+ assert(options.connectionEstablishedCallback.called);
+ });
+ it('closes existing connection before reconnecting', async function () {
+ const existingConnection = {
+ done: sinon.stub(),
+ };
+ listener.connection = existingConnection;
+ await listener._reconnect(0, 1);
+ assert(existingConnection.done.called);
+ });
+ it('overrides a pending reconnect', async function () {
+ this.slow(300);
+ const pendingReconnect = sinon.stub();
+ listener.reconnectPending = setTimeout(pendingReconnect, 100);
+ await listener._reconnect(0, 1);
+ await snooze(110);
+ assert(!pendingReconnect.called);
+ });
+ it('fails with no remaining retries', async function () {
+ const expected = new Error('foo');
+ pgpStub.connect = sinon.stub().rejects(expected);
+ try {
+ await listener._reconnect(0, 0);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ it('fails all remaining retries', async function () {
+ const expected = new Error('foo');
+ pgpStub.connect = sinon.stub().rejects(expected);
+ try {
+ await listener._reconnect(0, 1);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ it('fails first retry', async function () {
+ const expected = new Error('foo');
+ pgpStub.connect = sinon.stub().onCall(0).rejects(expected).resolves(connectionStub);
+ await listener._reconnect(0, 1);
+ assert(options.connectionEstablishedCallback.called);
+ });
+ }); // _reconnect
+
+ describe('_onConnectionLost', function () {
+ let error, event;
+ beforeEach(function () {
+ error = new Error('blah');
+ event = connectionStub;
+ sinon.stub(listener, '_reconnect');
+ });
+ it('success', async function () {
+ await listener._onConnectionLost(error, event);
+ assert.strictEqual(listener.connection, null);
+ assert(event.client.removeListener.called);
+ assert(listener.options.connectionLostCallback.called);
+ assert(listener._reconnect.called);
+ });
+ it('covers reconnect failure', async function () {
+ listener._reconnect.rejects(error);
+ await listener._onConnectionLost(error, event);
+ assert.strictEqual(listener.connection, null);
+ assert(event.client.removeListener.called);
+ assert(listener.options.connectionLostCallback.called);
+ assert(listener._reconnect.called);
+ });
+ it('covers listener removal failure', async function () {
+ event.client.removeListener.throws(error);
+ await listener._onConnectionLost(error, event);
+ assert.strictEqual(listener.connection, null);
+ assert(event.client.removeListener.called);
+ assert(listener.options.connectionLostCallback.called);
+ assert(listener._reconnect.called);
+ });
+ }); // _onConnectionLost
+
+ describe('_onNotification', function () {
+ it('sends data', async function () {
+ const data = {
+ payload: 'foo',
+ };
+ await listener._onNotification(data);
+ assert(listener.options.dataCallback.called);
+ });
+ it('ignores pings', async function () {
+ const data = {
+ payload: 'ping',
+ };
+ await listener._onNotification(data);
+ assert(!listener.options.dataCallback.called);
+ });
+ }); // _onNotification
+
+ describe('_sendPing', function () {
+ it('covers no connection', async function () {
+ this.slow(300);
+ await listener._sendPing();
+ await snooze(110);
+ clearTimeout(listener.nextPingTimeout);
+ });
+ it('success', async function () {
+ this.slow(300);
+ listener.connection = connectionStub;
+ await listener._sendPing();
+ await snooze(110);
+ clearTimeout(listener.nextPingTimeout);
+ assert(connectionStub.none.called);
+ });
+ it('covers error', async function () {
+ const err = new Error('blah');
+ this.slow(300);
+ listener.connection = connectionStub;
+ listener.connection.none.rejects(err);
+ await listener._sendPing();
+ await snooze(110);
+ clearTimeout(listener.nextPingTimeout);
+ assert(listener.connection.none.called);
+
+ });
+ }); // _sendPing
+
+}); // Postgres Listener
sinon.restore();
});
+ it('covers listener', function () {
+ const listenerOptions = new Config('test');
+ listenerOptions.db.cacheEnabled = true;
+ const listenerDb = new DB(stubLogger, listenerOptions, pgpStub);
+ assert(listenerDb);
+ });
+
// Ensure all interface methods are implemented
describe('Implementation', function () {
it('implements interface', async function () {
db.pgpInitOptions.query(event);
assert(db.logger.debug.called);
});
+ it('covers NOTIFY', function () {
+ const event = { query: 'NOTIFY thing' };
+ db.pgpInitOptions.query(event);
+ assert(!db.logger.debug.called);
+ });
}); // query
describe('receive', function () {
it('covers', function () {
assert(db.logger.debug.called);
assert.deepStrictEqual(data, expectedData);
});
+ it('covers NOTIFY', function () {
+ const data = [
+ {
+ column_one: 'one', // eslint-disable-line camelcase
+ column_two: 2, // eslint-disable-line camelcase
+ },
+ {
+ column_one: 'foo', // eslint-disable-line camelcase
+ column_two: 4, // eslint-disable-line camelcase
+ },
+ ];
+ const result = {
+ command: 'NOTIFY',
+ };
+ const event = {};
+ const expectedData = [
+ {
+ columnOne: 'one',
+ columnTwo: 2,
+ },
+ {
+ columnOne: 'foo',
+ columnTwo: 4,
+ },
+ ];
+ db.pgpInitOptions.receive(data, result, event)
+ assert(!db.logger.debug.called);
+ assert.deepStrictEqual(data, expectedData);
+ });
}); // receive
}); // pgpInitOptions
}); // _initTables
describe('initialize', function () {
+ after(function () {
+ delete db.listener;
+ });
it('passes supported version', async function () {
const version = { major: 1, minor: 0, patch: 0 };
sinon.stub(db.db, 'one').resolves(version);
sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max);
await db.initialize();
});
+ it('covers listener', async function() {
+ db.listener = {
+ start: sinon.stub(),
+ };
+ const version = { major: 1, minor: 0, patch: 0 };
+ sinon.stub(db.db, 'one').resolves(version);
+ await db.initialize(false);
+ assert(db.listener.start.called);
+ });
}); // initialize
describe('healthCheck', function () {
}); // _queryFileHelper
describe('_closeConnection', function () {
+ after(function () {
+ delete db.listener;
+ });
it('success', async function () {
sinon.stub(db._pgp, 'end');
await db._closeConnection();
assert.deepStrictEqual(e, expected);
}
});
+ it('covers listener', async function () {
+ db.listener = {
+ stop: sinon.stub(),
+ };
+ sinon.stub(db._pgp, 'end');
+ await db._closeConnection();
+ assert(db._pgp.end.called);
+ });
}); // _closeConnection
describe('_purgeTables', function () {
});
}); // _purgeTables
+ describe('_topicChanged', function () {
+ beforeEach(function () {
+ db.cache = new Map();
+ sinon.stub(db.cache, 'delete');
+ });
+ after(function () {
+ delete db.cache;
+ });
+ it('covers', function () {
+ db._topicChanged('topic-id');
+ assert(db.cache.delete.called);
+ });
+ it('ignores ping', function () {
+ db._topicChanged('ping');
+ assert(!db.cache.delete.called);
+ });
+ }); // _topicChanged
+
+ describe('_listenerEstablished', function () {
+ it('creates cache', function () {
+ delete db.cache;
+ db._listenerEstablished();
+ assert(db.cache instanceof Map);
+ });
+ }); // _listenerEstablished
+
+ describe('_listenerLost', function () {
+ it('removes cache', function () {
+ db.cache = new Map();
+ db._listenerLost();
+ assert(!db.cache);
+ });
+ }); // _listenerLost
+
+ describe('_cacheGet', function () {
+ let key;
+ beforeEach(function () {
+ key = 'key';
+ });
+ it('nothing if no cache', function () {
+ delete db.cache;
+ const result = db._cacheGet(key);
+ assert.strictEqual(result, undefined);
+ });
+ it('nothing if no entry', function () {
+ db.cache = new Map();
+ const result = db._cacheGet(key);
+ assert.strictEqual(result, undefined);
+ });
+ it('returns cached entry', function () {
+ db.cache = new Map();
+ const expected = {
+ foo: 'bar',
+ };
+ db._cacheSet(key, expected);
+ const result = db._cacheGet(key);
+ assert.deepStrictEqual(result, expected);
+ });
+ }); // _cacheGet
+
+ describe('_cacheSet', function () {
+ let key;
+ beforeEach(function () {
+ key = 'key';
+ });
+ it('covers no cache', function () {
+ delete db.cache;
+ db._cacheSet(key, 'data');
+ });
+ it('covers cache', function () {
+ db.cache = new Map();
+ const expected = 'blah';
+ db._cacheSet(key, expected);
+ const result = db._cacheGet(key);
+ assert.deepStrictEqual(result, expected);
+ });
+ }); // _cacheSet
+
describe('context', function () {
it('covers', async function () {
await db.context(common.nop);
}); // topicGetByUrl
describe('topicGetContentById', function () {
+ let topic;
+ beforeEach(function () {
+ delete db.cache;
+ topic = {
+ id: topicId,
+ };
+ });
it('success', async function() {
- const expected = { id: topicId };
+ const expected = topic;
sinon.stub(db.db, 'oneOrNone').resolves(expected);
const result = await db.topicGetContentById(dbCtx, topicId);
assert.deepStrictEqual(result, expected);
assert.deepStrictEqual(e, expected);
}
});
+ it('caches success', async function () {
+ db.cache = new Map();
+ const expected = topic;
+ sinon.stub(db.db, 'oneOrNone').resolves(expected);
+ const result = await db.topicGetContentById(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ });
+ it('covers cached entry', async function() {
+ let result;
+ db.cache = new Map();
+ const expected = topic;
+ sinon.stub(db.db, 'oneOrNone').resolves(expected);
+ result = await db.topicGetContentById(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ result = await db.topicGetContentById(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ });
}); // topicGetContentById
describe('topicSet', function () {
}
});
- });
+ }); // topicUpdate
describe('verificationClaim', function () {
it('success', async function() {