From 7320d3b351fc68e1d23ce24a6775860dd84ea9b4 Mon Sep 17 00:00:00 2001 From: Justin Wind Date: Fri, 27 Jun 2025 13:59:07 -0700 Subject: [PATCH] add postgres listener --- index.js | 2 + lib/postgres-creator.js | 9 + lib/postgres-listener.js | 193 ++++++++++++++++ test-integration/abstract.js | 8 - test-integration/postgresql/index.js | 75 ++++++- .../postgresql/sql/schema/1.0.0/apply.sql | 20 ++ test/integration.js | 26 ++- test/lib/postgres-creator.js | 12 + test/lib/postgres-listener.js | 208 ++++++++++++++++++ 9 files changed, 537 insertions(+), 16 deletions(-) create mode 100644 lib/postgres-listener.js create mode 100644 test/lib/postgres-listener.js diff --git a/index.js b/index.js index ca29e39..daa46c1 100644 --- a/index.js +++ b/index.js @@ -6,6 +6,7 @@ const Errors = require('./lib/errors'); const Factory = require('./lib/factory'); const SchemaVersionHelper = require('./lib/schema-version-helper'); const PostgresCreator = require('./lib/postgres-creator'); +const PostgresListener = require('./lib/postgres-listener'); const SQLiteCreator = require('./lib/sqlite-creator'); const { validate } = require('./lib/validation'); const { interfaceMethods, stubCreator, stubPgp } = require('./test/stub'); @@ -19,6 +20,7 @@ module.exports = { SchemaVersionHelper, SQLiteCreator, PostgresCreator, + PostgresListener, validate, test: { itChecksImplementation, diff --git a/lib/postgres-creator.js b/lib/postgres-creator.js index 43715ac..5e124aa 100644 --- a/lib/postgres-creator.js +++ b/lib/postgres-creator.js @@ -197,6 +197,7 @@ const PostgresCreator = (Abstract) => { this._pgpInitOptions.error = (err, event) => { errorLogger(errorScope, '', { err, event }); + // TODO: close connection on err.code === '57P03' database shutting down }; // Log queries @@ -205,6 +206,10 @@ const PostgresCreator = (Abstract) => { const queryScope = _fileScope('pgp:query'); queryLogger = this.logger[queryLogLevel]; // eslint-disable-line security/detect-object-injection this._pgpInitOptions.query = (event) => { + // Quell outgoing pings + if (event?.query?.startsWith('NOTIFY')) { + return; + } queryLogger(queryScope, '', { query: event?.query, params: event?.params, @@ -226,6 +231,10 @@ const PostgresCreator = (Abstract) => { } } if (queryLogLevel) { + // Quell outgoing pings + if (result && result.command === 'NOTIFY') { + return; + } // Omitting .rows queryLogger(resultScope, '', { query: event?.query, diff --git a/lib/postgres-listener.js b/lib/postgres-listener.js new file mode 100644 index 0000000..9810eed --- /dev/null +++ b/lib/postgres-listener.js @@ -0,0 +1,193 @@ +'use strict'; + +const { fileScope } = require('@squeep/log-helper'); + +const _fileScope = fileScope(__filename); +const nop = () => undefined; + +/** + * @typedef {object} PostgresListenerOptions + * @property {string} channel notification channel to listen to + * @property {(payload: string, channel: string) => Promise} dataCallback callback for incoming data + * @property {() => Promise} connectionLostCallback callback for connection lost events + * @property {() => Promise} connectionEstablishedCallback callback for connection established events + * @property {number} pingDelayMs delay between pings in milliseconds + * @property {number} reconnectDelayMs delay before attempting to reconnect in milliseconds + * @property {number} reconnectTimes number of times to attempt reconnection before giving up, -1 for infinity + */ + +/** + * @type {PostgresListenerOptions} + */ +const defaultOptions = { + channel: 'cache_invalidation', + dataCallback: nop, + connectionLostCallback: nop, + connectionEstablishedCallback: nop, + pingDelayMs: 5000, + reconnectDelayMs: 6000, + reconnectTimes: -1, // infinite retries by default +}; + +/** + * Create a robust connection which listens to a notification channel. + */ +class PostgresListener { + constructor(logger, db, options) { + this.logger = logger; + this.db = db; + + this.options = { + ...defaultOptions, + ...options, + }; + this.notificationEventName = 'notification'; + + this.connection = null; + this.nextPingTimeout = undefined; + + // bound versions of these methods, as we pass them around + this._onConnectionLostBound = this._onConnectionLost.bind(this); + this._onNotificationBound = this._onNotification.bind(this); + } + + + /** + * Establish the listener connection. + */ + async start() { + await this._reconnect(0, 1); // initial connection will fail after first attempt instead of retrying + this._sendPing(); + } + + + /** + * Shut down the listener connection. + */ + async stop() { + const _scope = _fileScope('stop'); + if (this.reconnectPending) { + this.logger.debug(_scope, 'overriding existing reconnect retry'); + clearTimeout(this.reconnectPending); + delete this.reconnectPending; + } + if (this.nextPingTimeout) { + clearTimeout(this.nextPingTimeout); + this.nextPingTimeout = undefined; + } + if (this.connection) { + this.connection.client.removeListener(this.notificationEventName, this._onNotificationBound); + await this.connection.done(); + this.connection = null; + await this.options.connectionLostCallback(); + } + } + + + /** + * Begin sending connection pings. + */ + _sendPing() { + const _scope = _fileScope('_sendPing'); + this.nextPingTimeout = setTimeout(async () => { + try { + if (this.connection) { + await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' }); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + } finally { + this._sendPing(); + } + }, this.options.pingDelayMs); + } + + + /** + * Notify callback. + * @param {object} data listener notification + */ + async _onNotification(data) { + const _scope = _fileScope('_onNotification'); + this.logger.debug(_scope, 'called', data); + + const { channel, payload } = data; + // Ignore our own messages + if (payload === 'ping') { + return; + } + await this.options.dataCallback(payload, channel); + } + + + /** + * Notify callback and attempt to reconnect. + * @param {*} error error + * @param {*} event event + */ + async _onConnectionLost(error, event) { + const _scope = _fileScope('_onConnectionLost'); + this.logger.error(_scope, 'listener connection lost', { error, event }); + this.connection = null; + try { + event.client.removeListener(this.notificationEventName, this._onNotificationBound); + } catch (e) { + this.logger.error(_scope, 'failed to remove listener', { error: e }); + // That's okay, it was probably just gone anyhow. + } + await this.options.connectionLostCallback(); + try { + await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes); + } catch (e) { + this.logger.error(_scope, 'failed to reconnect listener', { error: e }); + } + } + + + /** + * Schedule an attempt to establish a connection. + * @param {number} delay reconnect delay + * @param {number} retriesRemaining retry countdown + */ + async _reconnect(delay, retriesRemaining) { + const _scope = _fileScope('_reconnect'); + if (this.connection) { + this.logger.debug(_scope, 'closing existing connection'); + this.connection.done(); + this.connection = null; + } + if (this.reconnectPending) { + this.logger.debug(_scope, 'overriding existing reconnect retry'); + clearTimeout(this.reconnectPending); + } + return new Promise((resolve, reject) => { + this.reconnectPending = setTimeout(async () => { + try { + delete this.reconnectPending; + this.connection = await this.db.connect({ + direct: true, + onLost: this._onConnectionLostBound, + }); + this.connection.client.on(this.notificationEventName, this._onNotificationBound); + await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel }); + this.logger.debug(_scope, 'listener connection established'); + await this.options.connectionEstablishedCallback(); + resolve(); + } catch (e) { + if (retriesRemaining == 0) { + return reject(e); + } + try { + await this._reconnect(delay, retriesRemaining - 1); + resolve(); + } catch (e2) { + reject(e2); + } + } + }, delay); + }); + } + +} + +module.exports = PostgresListener; \ No newline at end of file diff --git a/test-integration/abstract.js b/test-integration/abstract.js index 82c9ac0..636bff9 100644 --- a/test-integration/abstract.js +++ b/test-integration/abstract.js @@ -4,14 +4,6 @@ const { Abstract } = require('../'); class AbstractIntegration extends Abstract { - constructor(...args) { - super(...args); - if (!this._isProduction) { - this._tableNames.push(...[ - 'almanac', - ]); - } - } // eslint-disable-next-line class-methods-use-this get schemaVersionMin() { diff --git a/test-integration/postgresql/index.js b/test-integration/postgresql/index.js index fbc9d8a..513551c 100644 --- a/test-integration/postgresql/index.js +++ b/test-integration/postgresql/index.js @@ -3,6 +3,7 @@ const Abstract = require('../abstract'); const PostgresCreator = require('../../lib/postgres-creator'); +const PostgresListener = require('../../lib/postgres-listener'); const DBErrors = require('../../lib/errors'); const assert = require('node:assert'); @@ -13,40 +14,109 @@ class PostgresDB extends PostgresCreator(Abstract) { constructor(...args) { super(...args); + // Implementations should declare table names which integration tests will purge before running. if (!this._isProduction) { this._tableNames = ['almanac']; } + this.listener = new PostgresListener(this.logger, this.db, { + channel: 'almanac_changed', + dataCallback: this._listenerDataCallback.bind(this), + connectionLostCallback: this._listenerConnectionLostCallback.bind(this), + connectionEstablishedCallback: this._listenerConnectionEstablishedCallback.bind(this), + }); } + async initialize(sqlPath = __dirname, applyMigrations = true) { await super.initialize(sqlPath, applyMigrations); + await this.listener.start(); + } + + + async _closeConnection() { + await this.listener.stop(); + await super._closeConnection(); } - async _engineSpecificTests() { + + /** + * Integration test coverage for type handling and notification channels. + * @param {*} t mocha test context + */ + async _engineSpecificTests(t) { + const _scope = '_engineSpecificTests'; // cover bigint array types const result = await this.db.one('SELECT ARRAY[1, 2]::bigint[] as value'); const expected = { value: [1n, 2n], }; assert.deepStrictEqual(result, expected); + + // Wait for listener pings + const pingMs = this.listener.options.pingDelayMs || 5000;; + t.timeout(pingMs * 3); + t.slow(pingMs * 2.5); + this.logger.debug(_scope, 'waiting 6s for listener ping to fire'); + await new Promise((resolve) => setTimeout(resolve, pingMs * 1.2)); + } + + + async _listenerConnectionEstablishedCallback() { + const _scope = 'listenerConnectionEstablishedCallback'; + this.logger.debug(_scope, 'called'); + this.cache = new Map(); // pretend it is a real cache + } + + + async _listenerConnectionLostCallback() { + const _scope = 'listenerConnectionLostCallback'; + this.logger.debug(_scope, 'called'); + delete this.cache; + } + + + async _listenerDataCallback(payload) { + const _scope = 'listenerDataCallback'; + if (!this.cache) { + this.logger.error(_scope, 'no cache available'); + return; + } + if (payload !== 'ping') { + this.logger.debug(_scope, 'called', { payload }); + this.cache.delete(payload); + } } + async almanacGetAll(dbCtx) { const _scope = 'almanacGetAll'; this.logger.debug(_scope, 'called'); try { - return await dbCtx.manyOrNone(this.statement.almanacGetAll); + // cache-invalidation over the set of all records is left as a challenge + const all = await dbCtx.manyOrNone(this.statement.almanacGetAll); + if (this.cache) { + for (const entry of all) { + this.cache.set(entry.event, entry.date); + } + } + return all; } catch (error) { this.logger.error(_scope, 'failed', { error }); throw error; } } + async almanacGet(dbCtx, event) { const _scope = 'almanacGet'; this.logger.debug(_scope, 'called', { event }); try { + if (this.cache?.has(event)) { + this.logger.debug(_scope, 'cache hit', { event }); + return this.cache.get(event); + } const { date } = await dbCtx.oneOrNone(this.statement.almanacGet, { event }); + this.cache?.set(event, date); return date; } catch (error) { this.logger.error(_scope, 'failed', { error, event }); @@ -54,6 +124,7 @@ class PostgresDB extends PostgresCreator(Abstract) { } } + async almanacUpsert(dbCtx, event, date) { const _scope = 'almanacUpsert'; this.logger.debug(_scope, 'called', { event, date }); diff --git a/test-integration/postgresql/sql/schema/1.0.0/apply.sql b/test-integration/postgresql/sql/schema/1.0.0/apply.sql index bedbf35..98e5567 100644 --- a/test-integration/postgresql/sql/schema/1.0.0/apply.sql +++ b/test-integration/postgresql/sql/schema/1.0.0/apply.sql @@ -5,3 +5,23 @@ CREATE TABLE almanac ( COMMENT ON TABLE almanac IS $docstring$ Notable events for service administration. $docstring$; + +CREATE OR REPLACE FUNCTION almanac_changed() +RETURNS TRIGGER +LANGUAGE plpgsql +AS $$ + DECLARE + payload varchar; + BEGIN + payload = CAST(NEW.event AS text); + PERFORM pg_notify('almanac_changed', payload); + RETURN NEW; + END; +$$ +; + +CREATE OR REPLACE TRIGGER almanac_changed_trigger +AFTER UPDATE OR DELETE ON almanac +FOR EACH ROW + EXECUTE FUNCTION almanac_changed() +; \ No newline at end of file diff --git a/test/integration.js b/test/integration.js index a72eb31..1e515d3 100644 --- a/test/integration.js +++ b/test/integration.js @@ -88,7 +88,7 @@ describe('Database Integration', function () { describe('Engine Specific Tests', function () { it('does things', async function () { - await db._engineSpecificTests(); + await db._engineSpecificTests(this); }); }); // Engine Specific @@ -96,7 +96,7 @@ describe('Database Integration', function () { let events, date1String, date2String; beforeEach(function () { date1String = 'Oct 28 2023 13:24 PDT'; - date2String = '2023-09-29T09:58:00.000Z'; + date2String = '2023-09-29T09:58:00.000Z'; events = [ { event: 'event1', date: new Date(date1String) }, { event: 'event2', date: new Date(date2String) }, @@ -109,29 +109,43 @@ describe('Database Integration', function () { await db.almanacUpsert(dbCtx, event, date); }); }); - + step('fetch a record', async function () { const { event, date: expected } = events[0]; await db.context(async (dbCtx) => { const date = await db.almanacGet(dbCtx, event); assert.deepStrictEqual(date, expected); + if (db.cache) { + assert(db.cache.has(event)); + } }); }); - + + step('fetch a cached record', async function () { + const { event, date: expected } = events[0]; + await db.context(async (dbCtx) => { + const date = await db.almanacGet(dbCtx, event); + assert.deepStrictEqual(date, expected); + if (db.cache) { + assert(db.cache.has(event)); + } + }); + }); + step('add another record', async function () { const { event, date } = events[1]; await db.context(async (dbCtx) => { await db.almanacUpsert(dbCtx, event, date); }); }); - + step('fetch all records', async function () { const expected = events; await db.context(async (dbCtx) => { const allEvents = await db.almanacGetAll(dbCtx); assert.deepStrictEqual(allEvents, expected); }); - }); + }); step('transaction', async function () { const expected = events; diff --git a/test/lib/postgres-creator.js b/test/lib/postgres-creator.js index a33e89b..c8b071a 100644 --- a/test/lib/postgres-creator.js +++ b/test/lib/postgres-creator.js @@ -64,6 +64,11 @@ describe('Postgres Creator', function () { db._pgpInitOptions.query(event); assert(db.logger.debug.called); }); + it('does not log NOTIFY', function () { + const event = { query: 'NOTIFY ping' }; + db._pgpInitOptions.query(event); + assert(!db.logger.debug.called); + }); }); // query describe('receive', function () { it('covers', function () { @@ -97,6 +102,13 @@ describe('Postgres Creator', function () { assert(db.logger.debug.called); assert.deepStrictEqual(data, expectedData); }); + it('does not log NOTIFY', function () { + const data = []; + const result = { command: 'NOTIFY' }; + const event = {}; + db._pgpInitOptions.receive({ data, result, ctx: event }); + assert(!db.logger.debug.called); + }); it('covers no query logging', function () { delete options.db.queryLogLevel; db = new DatabasePostgres(stubLogger, options, stubPgp); diff --git a/test/lib/postgres-listener.js b/test/lib/postgres-listener.js new file mode 100644 index 0000000..0d69bd1 --- /dev/null +++ b/test/lib/postgres-listener.js @@ -0,0 +1,208 @@ +/* eslint-env mocha */ +'use strict'; + +const assert = require('node:assert'); +const sinon = require('sinon'); +const { StubLogger } = require('@squeep/test-helper'); +const Listener = require('../../lib/postgres-listener'); + +const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms)); +const noExpectedException = 'did not get expected exception'; + +describe('Postgres Listener', function () { + let listener, options, connectionStub, pgpStub, stubLogger; + beforeEach(function () { + stubLogger = new StubLogger(sinon); + connectionStub = { + client: { + on: sinon.stub(), + removeListener: sinon.stub(), + }, + done: sinon.stub(), + none: sinon.stub(), + }; + pgpStub = { + connect: sinon.stub().resolves(connectionStub), + }; + options = { + dataCallback: sinon.stub(), + connectionLostCallback: sinon.stub(), + connectionEstablishedCallback: sinon.stub(), + pingDelayMs: 100, + reconnectDelayMs: 1000, + reconnectTimes: 1, + }; + listener = new Listener(stubLogger, pgpStub, options); + }); + afterEach(function () { + sinon.restore(); + }); + + describe('start', function () { + it('covers', async function () { + sinon.stub(listener, '_reconnect').resolves(); + sinon.stub(listener, '_sendPing').resolves(); + await listener.start(); + assert(listener._reconnect.called); + assert(listener._sendPing.called); + }); + }); // start + + describe('stop', function () { + it('covers not started', async function () { + await listener.stop(); + }); + it('cancels pending reconnect', async function() { + this.slow(300); + const pendingReconnect = sinon.stub(); + listener.reconnectPending = setTimeout(pendingReconnect, 100); + await listener.stop(); + await snooze(110); + assert(!pendingReconnect.called); + }); + it('cancels pending ping', async function() { + this.slow(300); + const nextPing = sinon.stub(); + listener.nextPingTimeout = setTimeout(nextPing, 100); + await listener.stop(); + await snooze(110); + assert(!nextPing.called); + }); + + it('closes existing connection', async function () { + listener.connection = connectionStub; + await listener.stop(); + assert(connectionStub.client.removeListener.called); + assert.strictEqual(listener.connection, null); + assert(options.connectionLostCallback.called); + }); + }); // stop + + describe('_reconnect', function () { + it('reconnects', async function () { + await listener._reconnect(0, 1); + assert(listener.connection); + assert(options.connectionEstablishedCallback.called); + }); + it('closes existing connection before reconnecting', async function () { + const existingConnection = { + done: sinon.stub(), + }; + listener.connection = existingConnection; + await listener._reconnect(0, 1); + assert(existingConnection.done.called); + }); + it('overrides a pending reconnect', async function () { + this.slow(300); + const pendingReconnect = sinon.stub(); + listener.reconnectPending = setTimeout(pendingReconnect, 100); + await listener._reconnect(0, 1); + await snooze(110); + assert(!pendingReconnect.called); + }); + it('fails with no remaining retries', async function () { + const expected = new Error('foo'); + pgpStub.connect = sinon.stub().rejects(expected); + try { + await listener._reconnect(0, 0); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + it('fails all remaining retries', async function () { + const expected = new Error('foo'); + pgpStub.connect = sinon.stub().rejects(expected); + try { + await listener._reconnect(0, 1); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + it('fails first retry', async function () { + const expected = new Error('foo'); + pgpStub.connect = sinon.stub().onCall(0).rejects(expected).resolves(connectionStub); + await listener._reconnect(0, 1); + assert(options.connectionEstablishedCallback.called); + }); + }); // _reconnect + + describe('_onConnectionLost', function () { + let error, event; + beforeEach(function () { + error = new Error('blah'); + event = connectionStub; + sinon.stub(listener, '_reconnect'); + }); + it('success', async function () { + await listener._onConnectionLost(error, event); + assert.strictEqual(listener.connection, null); + assert(event.client.removeListener.called); + assert(listener.options.connectionLostCallback.called); + assert(listener._reconnect.called); + }); + it('covers reconnect failure', async function () { + listener._reconnect.rejects(error); + await listener._onConnectionLost(error, event); + assert.strictEqual(listener.connection, null); + assert(event.client.removeListener.called); + assert(listener.options.connectionLostCallback.called); + assert(listener._reconnect.called); + }); + it('covers listener removal failure', async function () { + event.client.removeListener.throws(error); + await listener._onConnectionLost(error, event); + assert.strictEqual(listener.connection, null); + assert(event.client.removeListener.called); + assert(listener.options.connectionLostCallback.called); + assert(listener._reconnect.called); + }); + }); // _onConnectionLost + + describe('_onNotification', function () { + it('sends data', async function () { + const data = { + payload: 'foo', + }; + await listener._onNotification(data); + assert(listener.options.dataCallback.called); + }); + it('ignores pings', async function () { + const data = { + payload: 'ping', + }; + await listener._onNotification(data); + assert(!listener.options.dataCallback.called); + }); + }); // _onNotification + + describe('_sendPing', function () { + it('covers no connection', async function () { + this.slow(300); + await listener._sendPing(); + await snooze(110); + clearTimeout(listener.nextPingTimeout); + }); + it('success', async function () { + this.slow(300); + listener.connection = connectionStub; + await listener._sendPing(); + await snooze(110); + clearTimeout(listener.nextPingTimeout); + assert(connectionStub.none.called); + }); + it('covers error', async function () { + const err = new Error('blah'); + this.slow(300); + listener.connection = connectionStub; + listener.connection.none.rejects(err); + await listener._sendPing(); + await snooze(110); + clearTimeout(listener.nextPingTimeout); + assert(listener.connection.none.called); + + }); + }); // _sendPing + +}); // Postgres Listener -- 2.49.0