const Factory = require('./lib/factory');
const SchemaVersionHelper = require('./lib/schema-version-helper');
const PostgresCreator = require('./lib/postgres-creator');
+const PostgresListener = require('./lib/postgres-listener');
const SQLiteCreator = require('./lib/sqlite-creator');
const { validate } = require('./lib/validation');
const { interfaceMethods, stubCreator, stubPgp } = require('./test/stub');
SchemaVersionHelper,
SQLiteCreator,
PostgresCreator,
+ PostgresListener,
validate,
test: {
itChecksImplementation,
this._pgpInitOptions.error = (err, event) => {
errorLogger(errorScope, '', { err, event });
+ // TODO: close connection on err.code === '57P03' database shutting down
};
// Log queries
const queryScope = _fileScope('pgp:query');
queryLogger = this.logger[queryLogLevel]; // eslint-disable-line security/detect-object-injection
this._pgpInitOptions.query = (event) => {
+ // Quell outgoing pings
+ if (event?.query?.startsWith('NOTIFY')) {
+ return;
+ }
queryLogger(queryScope, '', {
query: event?.query,
params: event?.params,
}
}
if (queryLogLevel) {
+ // Quell outgoing pings
+ if (result && result.command === 'NOTIFY') {
+ return;
+ }
// Omitting .rows
queryLogger(resultScope, '', {
query: event?.query,
--- /dev/null
+'use strict';
+
+const { fileScope } = require('@squeep/log-helper');
+
+const _fileScope = fileScope(__filename);
+const nop = () => undefined;
+
+/**
+ * @typedef {object} PostgresListenerOptions
+ * @property {string} channel notification channel to listen to
+ * @property {(payload: string, channel: string) => Promise<void>} dataCallback callback for incoming data
+ * @property {() => Promise<void>} connectionLostCallback callback for connection lost events
+ * @property {() => Promise<void>} connectionEstablishedCallback callback for connection established events
+ * @property {number} pingDelayMs delay between pings in milliseconds
+ * @property {number} reconnectDelayMs delay before attempting to reconnect in milliseconds
+ * @property {number} reconnectTimes number of times to attempt reconnection before giving up, -1 for infinity
+ */
+
+/**
+ * @type {PostgresListenerOptions}
+ */
+const defaultOptions = {
+ channel: 'cache_invalidation',
+ dataCallback: nop,
+ connectionLostCallback: nop,
+ connectionEstablishedCallback: nop,
+ pingDelayMs: 5000,
+ reconnectDelayMs: 6000,
+ reconnectTimes: -1, // infinite retries by default
+};
+
+/**
+ * Create a robust connection which listens to a notification channel.
+ */
+class PostgresListener {
+ constructor(logger, db, options) {
+ this.logger = logger;
+ this.db = db;
+
+ this.options = {
+ ...defaultOptions,
+ ...options,
+ };
+ this.notificationEventName = 'notification';
+
+ this.connection = null;
+ this.nextPingTimeout = undefined;
+
+ // bound versions of these methods, as we pass them around
+ this._onConnectionLostBound = this._onConnectionLost.bind(this);
+ this._onNotificationBound = this._onNotification.bind(this);
+ }
+
+
+ /**
+ * Establish the listener connection.
+ */
+ async start() {
+ await this._reconnect(0, 1); // initial connection will fail after first attempt instead of retrying
+ this._sendPing();
+ }
+
+
+ /**
+ * Shut down the listener connection.
+ */
+ async stop() {
+ const _scope = _fileScope('stop');
+ if (this.reconnectPending) {
+ this.logger.debug(_scope, 'overriding existing reconnect retry');
+ clearTimeout(this.reconnectPending);
+ delete this.reconnectPending;
+ }
+ if (this.nextPingTimeout) {
+ clearTimeout(this.nextPingTimeout);
+ this.nextPingTimeout = undefined;
+ }
+ if (this.connection) {
+ this.connection.client.removeListener(this.notificationEventName, this._onNotificationBound);
+ await this.connection.done();
+ this.connection = null;
+ await this.options.connectionLostCallback();
+ }
+ }
+
+
+ /**
+ * Begin sending connection pings.
+ */
+ _sendPing() {
+ const _scope = _fileScope('_sendPing');
+ this.nextPingTimeout = setTimeout(async () => {
+ try {
+ if (this.connection) {
+ await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' });
+ }
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ } finally {
+ this._sendPing();
+ }
+ }, this.options.pingDelayMs);
+ }
+
+
+ /**
+ * Notify callback.
+ * @param {object} data listener notification
+ */
+ async _onNotification(data) {
+ const _scope = _fileScope('_onNotification');
+ this.logger.debug(_scope, 'called', data);
+
+ const { channel, payload } = data;
+ // Ignore our own messages
+ if (payload === 'ping') {
+ return;
+ }
+ await this.options.dataCallback(payload, channel);
+ }
+
+
+ /**
+ * Notify callback and attempt to reconnect.
+ * @param {*} error error
+ * @param {*} event event
+ */
+ async _onConnectionLost(error, event) {
+ const _scope = _fileScope('_onConnectionLost');
+ this.logger.error(_scope, 'listener connection lost', { error, event });
+ this.connection = null;
+ try {
+ event.client.removeListener(this.notificationEventName, this._onNotificationBound);
+ } catch (e) {
+ this.logger.error(_scope, 'failed to remove listener', { error: e });
+ // That's okay, it was probably just gone anyhow.
+ }
+ await this.options.connectionLostCallback();
+ try {
+ await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes);
+ } catch (e) {
+ this.logger.error(_scope, 'failed to reconnect listener', { error: e });
+ }
+ }
+
+
+ /**
+ * Schedule an attempt to establish a connection.
+ * @param {number} delay reconnect delay
+ * @param {number} retriesRemaining retry countdown
+ */
+ async _reconnect(delay, retriesRemaining) {
+ const _scope = _fileScope('_reconnect');
+ if (this.connection) {
+ this.logger.debug(_scope, 'closing existing connection');
+ this.connection.done();
+ this.connection = null;
+ }
+ if (this.reconnectPending) {
+ this.logger.debug(_scope, 'overriding existing reconnect retry');
+ clearTimeout(this.reconnectPending);
+ }
+ return new Promise((resolve, reject) => {
+ this.reconnectPending = setTimeout(async () => {
+ try {
+ delete this.reconnectPending;
+ this.connection = await this.db.connect({
+ direct: true,
+ onLost: this._onConnectionLostBound,
+ });
+ this.connection.client.on(this.notificationEventName, this._onNotificationBound);
+ await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel });
+ this.logger.debug(_scope, 'listener connection established');
+ await this.options.connectionEstablishedCallback();
+ resolve();
+ } catch (e) {
+ if (retriesRemaining == 0) {
+ return reject(e);
+ }
+ try {
+ await this._reconnect(delay, retriesRemaining - 1);
+ resolve();
+ } catch (e2) {
+ reject(e2);
+ }
+ }
+ }, delay);
+ });
+ }
+
+}
+
+module.exports = PostgresListener;
\ No newline at end of file
const { Abstract } = require('../');
class AbstractIntegration extends Abstract {
- constructor(...args) {
- super(...args);
- if (!this._isProduction) {
- this._tableNames.push(...[
- 'almanac',
- ]);
- }
- }
// eslint-disable-next-line class-methods-use-this
get schemaVersionMin() {
const Abstract = require('../abstract');
const PostgresCreator = require('../../lib/postgres-creator');
+const PostgresListener = require('../../lib/postgres-listener');
const DBErrors = require('../../lib/errors');
const assert = require('node:assert');
constructor(...args) {
super(...args);
+ // Implementations should declare table names which integration tests will purge before running.
if (!this._isProduction) {
this._tableNames = ['almanac'];
}
+ this.listener = new PostgresListener(this.logger, this.db, {
+ channel: 'almanac_changed',
+ dataCallback: this._listenerDataCallback.bind(this),
+ connectionLostCallback: this._listenerConnectionLostCallback.bind(this),
+ connectionEstablishedCallback: this._listenerConnectionEstablishedCallback.bind(this),
+ });
}
+
async initialize(sqlPath = __dirname, applyMigrations = true) {
await super.initialize(sqlPath, applyMigrations);
+ await this.listener.start();
+ }
+
+
+ async _closeConnection() {
+ await this.listener.stop();
+ await super._closeConnection();
}
- async _engineSpecificTests() {
+
+ /**
+ * Integration test coverage for type handling and notification channels.
+ * @param {*} t mocha test context
+ */
+ async _engineSpecificTests(t) {
+ const _scope = '_engineSpecificTests';
// cover bigint array types
const result = await this.db.one('SELECT ARRAY[1, 2]::bigint[] as value');
const expected = {
value: [1n, 2n],
};
assert.deepStrictEqual(result, expected);
+
+ // Wait for listener pings
+ const pingMs = this.listener.options.pingDelayMs || 5000;;
+ t.timeout(pingMs * 3);
+ t.slow(pingMs * 2.5);
+ this.logger.debug(_scope, 'waiting 6s for listener ping to fire');
+ await new Promise((resolve) => setTimeout(resolve, pingMs * 1.2));
+ }
+
+
+ async _listenerConnectionEstablishedCallback() {
+ const _scope = 'listenerConnectionEstablishedCallback';
+ this.logger.debug(_scope, 'called');
+ this.cache = new Map(); // pretend it is a real cache
+ }
+
+
+ async _listenerConnectionLostCallback() {
+ const _scope = 'listenerConnectionLostCallback';
+ this.logger.debug(_scope, 'called');
+ delete this.cache;
+ }
+
+
+ async _listenerDataCallback(payload) {
+ const _scope = 'listenerDataCallback';
+ if (!this.cache) {
+ this.logger.error(_scope, 'no cache available');
+ return;
+ }
+ if (payload !== 'ping') {
+ this.logger.debug(_scope, 'called', { payload });
+ this.cache.delete(payload);
+ }
}
+
async almanacGetAll(dbCtx) {
const _scope = 'almanacGetAll';
this.logger.debug(_scope, 'called');
try {
- return await dbCtx.manyOrNone(this.statement.almanacGetAll);
+ // cache-invalidation over the set of all records is left as a challenge
+ const all = await dbCtx.manyOrNone(this.statement.almanacGetAll);
+ if (this.cache) {
+ for (const entry of all) {
+ this.cache.set(entry.event, entry.date);
+ }
+ }
+ return all;
} catch (error) {
this.logger.error(_scope, 'failed', { error });
throw error;
}
}
+
async almanacGet(dbCtx, event) {
const _scope = 'almanacGet';
this.logger.debug(_scope, 'called', { event });
try {
+ if (this.cache?.has(event)) {
+ this.logger.debug(_scope, 'cache hit', { event });
+ return this.cache.get(event);
+ }
const { date } = await dbCtx.oneOrNone(this.statement.almanacGet, { event });
+ this.cache?.set(event, date);
return date;
} catch (error) {
this.logger.error(_scope, 'failed', { error, event });
}
}
+
async almanacUpsert(dbCtx, event, date) {
const _scope = 'almanacUpsert';
this.logger.debug(_scope, 'called', { event, date });
COMMENT ON TABLE almanac IS $docstring$
Notable events for service administration.
$docstring$;
+
+CREATE OR REPLACE FUNCTION almanac_changed()
+RETURNS TRIGGER
+LANGUAGE plpgsql
+AS $$
+ DECLARE
+ payload varchar;
+ BEGIN
+ payload = CAST(NEW.event AS text);
+ PERFORM pg_notify('almanac_changed', payload);
+ RETURN NEW;
+ END;
+$$
+;
+
+CREATE OR REPLACE TRIGGER almanac_changed_trigger
+AFTER UPDATE OR DELETE ON almanac
+FOR EACH ROW
+ EXECUTE FUNCTION almanac_changed()
+;
\ No newline at end of file
describe('Engine Specific Tests', function () {
it('does things', async function () {
- await db._engineSpecificTests();
+ await db._engineSpecificTests(this);
});
}); // Engine Specific
let events, date1String, date2String;
beforeEach(function () {
date1String = 'Oct 28 2023 13:24 PDT';
- date2String = '2023-09-29T09:58:00.000Z';
+ date2String = '2023-09-29T09:58:00.000Z';
events = [
{ event: 'event1', date: new Date(date1String) },
{ event: 'event2', date: new Date(date2String) },
await db.almanacUpsert(dbCtx, event, date);
});
});
-
+
step('fetch a record', async function () {
const { event, date: expected } = events[0];
await db.context(async (dbCtx) => {
const date = await db.almanacGet(dbCtx, event);
assert.deepStrictEqual(date, expected);
+ if (db.cache) {
+ assert(db.cache.has(event));
+ }
});
});
-
+
+ step('fetch a cached record', async function () {
+ const { event, date: expected } = events[0];
+ await db.context(async (dbCtx) => {
+ const date = await db.almanacGet(dbCtx, event);
+ assert.deepStrictEqual(date, expected);
+ if (db.cache) {
+ assert(db.cache.has(event));
+ }
+ });
+ });
+
step('add another record', async function () {
const { event, date } = events[1];
await db.context(async (dbCtx) => {
await db.almanacUpsert(dbCtx, event, date);
});
});
-
+
step('fetch all records', async function () {
const expected = events;
await db.context(async (dbCtx) => {
const allEvents = await db.almanacGetAll(dbCtx);
assert.deepStrictEqual(allEvents, expected);
});
- });
+ });
step('transaction', async function () {
const expected = events;
db._pgpInitOptions.query(event);
assert(db.logger.debug.called);
});
+ it('does not log NOTIFY', function () {
+ const event = { query: 'NOTIFY ping' };
+ db._pgpInitOptions.query(event);
+ assert(!db.logger.debug.called);
+ });
}); // query
describe('receive', function () {
it('covers', function () {
assert(db.logger.debug.called);
assert.deepStrictEqual(data, expectedData);
});
+ it('does not log NOTIFY', function () {
+ const data = [];
+ const result = { command: 'NOTIFY' };
+ const event = {};
+ db._pgpInitOptions.receive({ data, result, ctx: event });
+ assert(!db.logger.debug.called);
+ });
it('covers no query logging', function () {
delete options.db.queryLogLevel;
db = new DatabasePostgres(stubLogger, options, stubPgp);
--- /dev/null
+/* eslint-env mocha */
+'use strict';
+
+const assert = require('node:assert');
+const sinon = require('sinon');
+const { StubLogger } = require('@squeep/test-helper');
+const Listener = require('../../lib/postgres-listener');
+
+const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+const noExpectedException = 'did not get expected exception';
+
+describe('Postgres Listener', function () {
+ let listener, options, connectionStub, pgpStub, stubLogger;
+ beforeEach(function () {
+ stubLogger = new StubLogger(sinon);
+ connectionStub = {
+ client: {
+ on: sinon.stub(),
+ removeListener: sinon.stub(),
+ },
+ done: sinon.stub(),
+ none: sinon.stub(),
+ };
+ pgpStub = {
+ connect: sinon.stub().resolves(connectionStub),
+ };
+ options = {
+ dataCallback: sinon.stub(),
+ connectionLostCallback: sinon.stub(),
+ connectionEstablishedCallback: sinon.stub(),
+ pingDelayMs: 100,
+ reconnectDelayMs: 1000,
+ reconnectTimes: 1,
+ };
+ listener = new Listener(stubLogger, pgpStub, options);
+ });
+ afterEach(function () {
+ sinon.restore();
+ });
+
+ describe('start', function () {
+ it('covers', async function () {
+ sinon.stub(listener, '_reconnect').resolves();
+ sinon.stub(listener, '_sendPing').resolves();
+ await listener.start();
+ assert(listener._reconnect.called);
+ assert(listener._sendPing.called);
+ });
+ }); // start
+
+ describe('stop', function () {
+ it('covers not started', async function () {
+ await listener.stop();
+ });
+ it('cancels pending reconnect', async function() {
+ this.slow(300);
+ const pendingReconnect = sinon.stub();
+ listener.reconnectPending = setTimeout(pendingReconnect, 100);
+ await listener.stop();
+ await snooze(110);
+ assert(!pendingReconnect.called);
+ });
+ it('cancels pending ping', async function() {
+ this.slow(300);
+ const nextPing = sinon.stub();
+ listener.nextPingTimeout = setTimeout(nextPing, 100);
+ await listener.stop();
+ await snooze(110);
+ assert(!nextPing.called);
+ });
+
+ it('closes existing connection', async function () {
+ listener.connection = connectionStub;
+ await listener.stop();
+ assert(connectionStub.client.removeListener.called);
+ assert.strictEqual(listener.connection, null);
+ assert(options.connectionLostCallback.called);
+ });
+ }); // stop
+
+ describe('_reconnect', function () {
+ it('reconnects', async function () {
+ await listener._reconnect(0, 1);
+ assert(listener.connection);
+ assert(options.connectionEstablishedCallback.called);
+ });
+ it('closes existing connection before reconnecting', async function () {
+ const existingConnection = {
+ done: sinon.stub(),
+ };
+ listener.connection = existingConnection;
+ await listener._reconnect(0, 1);
+ assert(existingConnection.done.called);
+ });
+ it('overrides a pending reconnect', async function () {
+ this.slow(300);
+ const pendingReconnect = sinon.stub();
+ listener.reconnectPending = setTimeout(pendingReconnect, 100);
+ await listener._reconnect(0, 1);
+ await snooze(110);
+ assert(!pendingReconnect.called);
+ });
+ it('fails with no remaining retries', async function () {
+ const expected = new Error('foo');
+ pgpStub.connect = sinon.stub().rejects(expected);
+ try {
+ await listener._reconnect(0, 0);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ it('fails all remaining retries', async function () {
+ const expected = new Error('foo');
+ pgpStub.connect = sinon.stub().rejects(expected);
+ try {
+ await listener._reconnect(0, 1);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ it('fails first retry', async function () {
+ const expected = new Error('foo');
+ pgpStub.connect = sinon.stub().onCall(0).rejects(expected).resolves(connectionStub);
+ await listener._reconnect(0, 1);
+ assert(options.connectionEstablishedCallback.called);
+ });
+ }); // _reconnect
+
+ describe('_onConnectionLost', function () {
+ let error, event;
+ beforeEach(function () {
+ error = new Error('blah');
+ event = connectionStub;
+ sinon.stub(listener, '_reconnect');
+ });
+ it('success', async function () {
+ await listener._onConnectionLost(error, event);
+ assert.strictEqual(listener.connection, null);
+ assert(event.client.removeListener.called);
+ assert(listener.options.connectionLostCallback.called);
+ assert(listener._reconnect.called);
+ });
+ it('covers reconnect failure', async function () {
+ listener._reconnect.rejects(error);
+ await listener._onConnectionLost(error, event);
+ assert.strictEqual(listener.connection, null);
+ assert(event.client.removeListener.called);
+ assert(listener.options.connectionLostCallback.called);
+ assert(listener._reconnect.called);
+ });
+ it('covers listener removal failure', async function () {
+ event.client.removeListener.throws(error);
+ await listener._onConnectionLost(error, event);
+ assert.strictEqual(listener.connection, null);
+ assert(event.client.removeListener.called);
+ assert(listener.options.connectionLostCallback.called);
+ assert(listener._reconnect.called);
+ });
+ }); // _onConnectionLost
+
+ describe('_onNotification', function () {
+ it('sends data', async function () {
+ const data = {
+ payload: 'foo',
+ };
+ await listener._onNotification(data);
+ assert(listener.options.dataCallback.called);
+ });
+ it('ignores pings', async function () {
+ const data = {
+ payload: 'ping',
+ };
+ await listener._onNotification(data);
+ assert(!listener.options.dataCallback.called);
+ });
+ }); // _onNotification
+
+ describe('_sendPing', function () {
+ it('covers no connection', async function () {
+ this.slow(300);
+ await listener._sendPing();
+ await snooze(110);
+ clearTimeout(listener.nextPingTimeout);
+ });
+ it('success', async function () {
+ this.slow(300);
+ listener.connection = connectionStub;
+ await listener._sendPing();
+ await snooze(110);
+ clearTimeout(listener.nextPingTimeout);
+ assert(connectionStub.none.called);
+ });
+ it('covers error', async function () {
+ const err = new Error('blah');
+ this.slow(300);
+ listener.connection = connectionStub;
+ listener.connection.none.rejects(err);
+ await listener._sendPing();
+ await snooze(110);
+ clearTimeout(listener.nextPingTimeout);
+ assert(listener.connection.none.called);
+
+ });
+ }); // _sendPing
+
+}); // Postgres Listener