add postgres listener
authorJustin Wind <justin.wind+git@gmail.com>
Fri, 27 Jun 2025 20:59:07 +0000 (13:59 -0700)
committerJustin Wind <justin.wind+git@gmail.com>
Sat, 28 Jun 2025 20:25:16 +0000 (13:25 -0700)
index.js
lib/postgres-creator.js
lib/postgres-listener.js [new file with mode: 0644]
test-integration/abstract.js
test-integration/postgresql/index.js
test-integration/postgresql/sql/schema/1.0.0/apply.sql
test/integration.js
test/lib/postgres-creator.js
test/lib/postgres-listener.js [new file with mode: 0644]

index ca29e3987217572de23af54dd19ea55260431254..daa46c1f6fa131d236a11aeab52d72b60902fde1 100644 (file)
--- a/index.js
+++ b/index.js
@@ -6,6 +6,7 @@ const Errors = require('./lib/errors');
 const Factory = require('./lib/factory');
 const SchemaVersionHelper = require('./lib/schema-version-helper');
 const PostgresCreator = require('./lib/postgres-creator');
+const PostgresListener = require('./lib/postgres-listener');
 const SQLiteCreator = require('./lib/sqlite-creator');
 const { validate } = require('./lib/validation');
 const { interfaceMethods, stubCreator, stubPgp } = require('./test/stub');
@@ -19,6 +20,7 @@ module.exports = {
   SchemaVersionHelper,
   SQLiteCreator,
   PostgresCreator,
+  PostgresListener,
   validate,
   test: {
     itChecksImplementation,
index 43715ac1c336bb5f3db8bfdcaaa955c85ca56226..5e124aafdb4d1576d42bd4d8bd8433dcf494f666 100644 (file)
@@ -197,6 +197,7 @@ const PostgresCreator = (Abstract) => {
 
       this._pgpInitOptions.error = (err, event) => {
         errorLogger(errorScope, '', { err, event });
+        // TODO: close connection on err.code === '57P03' database shutting down
       };
 
       // Log queries
@@ -205,6 +206,10 @@ const PostgresCreator = (Abstract) => {
         const queryScope = _fileScope('pgp:query');
         queryLogger = this.logger[queryLogLevel]; // eslint-disable-line security/detect-object-injection
         this._pgpInitOptions.query = (event) => {
+          // Quell outgoing pings
+          if (event?.query?.startsWith('NOTIFY')) {
+            return;
+          }
           queryLogger(queryScope, '', {
             query: event?.query,
             params: event?.params,
@@ -226,6 +231,10 @@ const PostgresCreator = (Abstract) => {
           }
         }
         if (queryLogLevel) {
+          // Quell outgoing pings
+          if (result && result.command === 'NOTIFY') {
+            return;
+          }
           // Omitting .rows
           queryLogger(resultScope, '', {
             query: event?.query,
diff --git a/lib/postgres-listener.js b/lib/postgres-listener.js
new file mode 100644 (file)
index 0000000..9810eed
--- /dev/null
@@ -0,0 +1,193 @@
+'use strict';
+
+const { fileScope } = require('@squeep/log-helper');
+
+const _fileScope = fileScope(__filename);
+const nop = () => undefined;
+
+/**
+ * @typedef {object} PostgresListenerOptions
+ * @property {string} channel notification channel to listen to
+ * @property {(payload: string, channel: string) => Promise<void>} dataCallback callback for incoming data
+ * @property {() => Promise<void>} connectionLostCallback callback for connection lost events
+ * @property {() => Promise<void>} connectionEstablishedCallback callback for connection established events
+ * @property {number} pingDelayMs delay between pings in milliseconds
+ * @property {number} reconnectDelayMs delay before attempting to reconnect in milliseconds
+ * @property {number} reconnectTimes number of times to attempt reconnection before giving up, -1 for infinity
+ */
+
+/**
+ * @type {PostgresListenerOptions}
+ */
+const defaultOptions = {
+  channel: 'cache_invalidation',
+  dataCallback: nop,
+  connectionLostCallback: nop,
+  connectionEstablishedCallback: nop,
+  pingDelayMs: 5000,
+  reconnectDelayMs: 6000,
+  reconnectTimes: -1, // infinite retries by default
+};
+
+/**
+ * Create a robust connection which listens to a notification channel.
+ */
+class PostgresListener {
+  constructor(logger, db, options) {
+    this.logger = logger;
+    this.db = db;
+
+    this.options = {
+      ...defaultOptions,
+      ...options,
+    };
+    this.notificationEventName = 'notification';
+
+    this.connection = null;
+    this.nextPingTimeout = undefined;
+
+    // bound versions of these methods, as we pass them around
+    this._onConnectionLostBound = this._onConnectionLost.bind(this);
+    this._onNotificationBound = this._onNotification.bind(this);
+  }
+
+
+  /**
+   * Establish the listener connection.
+   */
+  async start() {
+    await this._reconnect(0, 1); // initial connection will fail after first attempt instead of retrying
+    this._sendPing();
+  }
+
+
+  /**
+   * Shut down the listener connection.
+   */
+  async stop() {
+    const _scope = _fileScope('stop');
+    if (this.reconnectPending) {
+      this.logger.debug(_scope, 'overriding existing reconnect retry');
+      clearTimeout(this.reconnectPending);
+      delete this.reconnectPending;
+    }
+    if (this.nextPingTimeout) {
+      clearTimeout(this.nextPingTimeout);
+      this.nextPingTimeout = undefined;
+    }
+    if (this.connection) {
+      this.connection.client.removeListener(this.notificationEventName, this._onNotificationBound);
+      await this.connection.done();
+      this.connection = null;
+      await this.options.connectionLostCallback();
+    }
+  }
+
+
+  /**
+   * Begin sending connection pings.
+   */
+  _sendPing() {
+    const _scope = _fileScope('_sendPing');
+    this.nextPingTimeout = setTimeout(async () => {
+      try {
+        if (this.connection) {
+          await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' });
+        }
+      } catch (e) {
+        this.logger.error(_scope, 'failed', { error: e });
+      } finally {
+        this._sendPing();
+      }
+    }, this.options.pingDelayMs);
+  }
+
+
+  /**
+   * Notify callback.
+   * @param {object} data listener notification
+   */
+  async _onNotification(data) {
+    const _scope = _fileScope('_onNotification');
+    this.logger.debug(_scope, 'called', data);
+
+    const { channel, payload } = data;
+    // Ignore our own messages
+    if (payload === 'ping') {
+      return;
+    }
+    await this.options.dataCallback(payload, channel);
+  }
+
+
+  /**
+   * Notify callback and attempt to reconnect.
+   * @param {*} error error
+   * @param {*} event event
+   */
+  async _onConnectionLost(error, event) {
+    const _scope = _fileScope('_onConnectionLost');
+    this.logger.error(_scope, 'listener connection lost', { error, event });
+    this.connection = null;
+    try {
+      event.client.removeListener(this.notificationEventName, this._onNotificationBound);
+    } catch (e) {
+      this.logger.error(_scope, 'failed to remove listener', { error: e });
+      // That's okay, it was probably just gone anyhow.
+    }
+    await this.options.connectionLostCallback();
+    try {
+      await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes);
+    } catch (e) {
+      this.logger.error(_scope, 'failed to reconnect listener', { error: e });
+    }
+  }
+
+
+  /**
+   * Schedule an attempt to establish a connection.
+   * @param {number} delay reconnect delay
+   * @param {number} retriesRemaining retry countdown
+   */
+  async _reconnect(delay, retriesRemaining) {
+    const _scope = _fileScope('_reconnect');
+    if (this.connection) {
+      this.logger.debug(_scope, 'closing existing connection');
+      this.connection.done();
+      this.connection = null;
+    }
+    if (this.reconnectPending) {
+      this.logger.debug(_scope, 'overriding existing reconnect retry');
+      clearTimeout(this.reconnectPending);
+    }
+    return new Promise((resolve, reject) => {
+      this.reconnectPending = setTimeout(async () => {
+        try {
+          delete this.reconnectPending;
+          this.connection = await this.db.connect({
+            direct: true,
+            onLost: this._onConnectionLostBound,
+          });
+          this.connection.client.on(this.notificationEventName, this._onNotificationBound);
+          await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel });
+          this.logger.debug(_scope, 'listener connection established');
+          await this.options.connectionEstablishedCallback();
+          resolve();
+        } catch (e) {
+          if (retriesRemaining == 0) {
+            return reject(e);
+          }
+          try {
+            await this._reconnect(delay, retriesRemaining - 1);
+            resolve();
+          } catch (e2) {
+            reject(e2);
+          }
+        }
+      }, delay);
+    });
+  }
+
+}
+
+module.exports = PostgresListener;
\ No newline at end of file
index 82c9ac084316d43addc41402a440591ee31b8352..636bff9a5f93015fd3cf56b34c657a1b2956318d 100644 (file)
@@ -4,14 +4,6 @@
 const { Abstract } = require('../');
 
 class AbstractIntegration extends Abstract {
-  constructor(...args) {
-    super(...args);
-    if (!this._isProduction) {
-      this._tableNames.push(...[
-        'almanac',
-      ]);
-    }
-  }
 
   // eslint-disable-next-line class-methods-use-this
   get schemaVersionMin() {
index fbc9d8a5fc9f42775c23a032108e0f3f552f0a5a..513551c4283f326c5c23b4f562c602e97ffc65f5 100644 (file)
@@ -3,6 +3,7 @@
 
 const Abstract = require('../abstract');
 const PostgresCreator = require('../../lib/postgres-creator');
+const PostgresListener = require('../../lib/postgres-listener');
 const DBErrors = require('../../lib/errors');
 const assert = require('node:assert');
 
@@ -13,40 +14,109 @@ class PostgresDB extends PostgresCreator(Abstract) {
 
   constructor(...args) {
     super(...args);
+    // Implementations should declare table names which integration tests will purge before running.
     if (!this._isProduction) {
       this._tableNames = ['almanac'];
     }
+    this.listener = new PostgresListener(this.logger, this.db, {
+      channel: 'almanac_changed',
+      dataCallback: this._listenerDataCallback.bind(this),
+      connectionLostCallback: this._listenerConnectionLostCallback.bind(this),
+      connectionEstablishedCallback: this._listenerConnectionEstablishedCallback.bind(this),
+    });
   }
 
+
   async initialize(sqlPath = __dirname, applyMigrations = true) {
     await super.initialize(sqlPath, applyMigrations);
+    await this.listener.start();
+  }
+
+
+  async _closeConnection() {
+    await this.listener.stop();
+    await super._closeConnection();
   }
 
-  async _engineSpecificTests() {
+
+  /**
+   * Integration test coverage for type handling and notification channels.
+   * @param {*} t mocha test context
+   */
+  async _engineSpecificTests(t) {
+    const _scope = '_engineSpecificTests';
     // cover bigint array types
     const result = await this.db.one('SELECT ARRAY[1, 2]::bigint[] as value');
     const expected = {
       value: [1n, 2n],
     };
     assert.deepStrictEqual(result, expected);
+
+    // Wait for listener pings
+    const pingMs = this.listener.options.pingDelayMs || 5000;;
+    t.timeout(pingMs * 3);
+    t.slow(pingMs * 2.5);
+    this.logger.debug(_scope, 'waiting 6s for listener ping to fire');
+    await new Promise((resolve) => setTimeout(resolve, pingMs * 1.2));
+  }
+
+
+  async _listenerConnectionEstablishedCallback() {
+    const _scope = 'listenerConnectionEstablishedCallback';
+    this.logger.debug(_scope, 'called');
+    this.cache = new Map(); // pretend it is a real cache
+  }
+
+
+  async _listenerConnectionLostCallback() {
+    const _scope = 'listenerConnectionLostCallback';
+    this.logger.debug(_scope, 'called');
+    delete this.cache;
+  }
+
+
+  async _listenerDataCallback(payload) {
+    const _scope = 'listenerDataCallback';
+    if (!this.cache) {
+      this.logger.error(_scope, 'no cache available');
+      return;
+    }
+    if (payload !== 'ping') {
+      this.logger.debug(_scope, 'called', { payload });
+      this.cache.delete(payload);
+    }
   }
 
+
   async almanacGetAll(dbCtx) {
     const _scope = 'almanacGetAll';
     this.logger.debug(_scope, 'called');
     try {
-      return await dbCtx.manyOrNone(this.statement.almanacGetAll);
+      // cache-invalidation over the set of all records is left as a challenge
+      const all = await dbCtx.manyOrNone(this.statement.almanacGetAll);
+      if (this.cache) {
+        for (const entry of all) {
+          this.cache.set(entry.event, entry.date);
+        }
+      }
+      return all;
     } catch (error) {
       this.logger.error(_scope, 'failed', { error });
       throw error;
     }
   }
 
+
   async almanacGet(dbCtx, event) {
     const _scope = 'almanacGet';
     this.logger.debug(_scope, 'called', { event });
     try {
+      if (this.cache?.has(event)) {
+        this.logger.debug(_scope, 'cache hit', { event });
+        return this.cache.get(event);
+      }
       const { date } = await dbCtx.oneOrNone(this.statement.almanacGet, { event });
+      this.cache?.set(event, date);
       return date;
     } catch (error) {
       this.logger.error(_scope, 'failed', { error, event });
@@ -54,6 +124,7 @@ class PostgresDB extends PostgresCreator(Abstract) {
     }
   }
 
+
   async almanacUpsert(dbCtx, event, date) {
     const _scope = 'almanacUpsert';
     this.logger.debug(_scope, 'called', { event, date });
index bedbf35383fe30671e3345acfeebee3e2860100a..98e55673def43392aeda39bdc6a3f9dfba22fee5 100644 (file)
@@ -5,3 +5,23 @@ CREATE TABLE almanac (
 COMMENT ON TABLE almanac IS $docstring$
 Notable events for service administration.
 $docstring$;
+
+CREATE OR REPLACE FUNCTION almanac_changed()
+RETURNS TRIGGER
+LANGUAGE plpgsql
+AS $$
+       DECLARE
+               payload varchar;
+       BEGIN
+               payload = CAST(NEW.event AS text);
+               PERFORM pg_notify('almanac_changed', payload);
+               RETURN NEW;
+       END;
+$$
+;
+
+CREATE OR REPLACE TRIGGER almanac_changed_trigger
+AFTER UPDATE OR DELETE ON almanac
+FOR EACH ROW
+       EXECUTE FUNCTION almanac_changed()
+;
\ No newline at end of file
index a72eb3160bde1a6cc5c9574265ff1321cc985416..1e515d3c822f6cc76228754f68d0470425088b9a 100644 (file)
@@ -88,7 +88,7 @@ describe('Database Integration', function () {
 
       describe('Engine Specific Tests', function () {
         it('does things', async function () {
-          await db._engineSpecificTests();
+          await db._engineSpecificTests(this);
         });
       }); // Engine Specific
 
@@ -96,7 +96,7 @@ describe('Database Integration', function () {
         let events, date1String, date2String;
         beforeEach(function () {
           date1String = 'Oct 28 2023 13:24 PDT';
-          date2String = '2023-09-29T09:58:00.000Z';        
+          date2String = '2023-09-29T09:58:00.000Z';
           events = [
             { event: 'event1', date: new Date(date1String) },
             { event: 'event2', date: new Date(date2String) },
@@ -109,29 +109,43 @@ describe('Database Integration', function () {
             await db.almanacUpsert(dbCtx, event, date);
           });
         });
-      
+
         step('fetch a record', async function () {
           const { event, date: expected } = events[0];
           await db.context(async (dbCtx) => {
             const date = await db.almanacGet(dbCtx, event);
             assert.deepStrictEqual(date, expected);
+            if (db.cache) {
+              assert(db.cache.has(event));
+            }
           });
         });
-      
+
+        step('fetch a cached record', async function () {
+          const { event, date: expected } = events[0];
+          await db.context(async (dbCtx) => {
+            const date = await db.almanacGet(dbCtx, event);
+            assert.deepStrictEqual(date, expected);
+            if (db.cache) {
+              assert(db.cache.has(event));
+            }
+          });
+        });
+
         step('add another record', async function () {
           const { event, date } = events[1];
           await db.context(async (dbCtx) => {
             await db.almanacUpsert(dbCtx, event, date);
           });
         });
-      
+
         step('fetch all records', async function () {
           const expected = events;
           await db.context(async (dbCtx) => {
             const allEvents = await db.almanacGetAll(dbCtx);
             assert.deepStrictEqual(allEvents, expected);
           });
-        });      
+        });
 
         step('transaction', async function () {
           const expected = events;
index a33e89b104ab47a078ab02b22f30310f2794c5b7..c8b071a975a866cf9d9dabd29c0159ee3b6e53a4 100644 (file)
@@ -64,6 +64,11 @@ describe('Postgres Creator', function () {
         db._pgpInitOptions.query(event);
         assert(db.logger.debug.called);
       });
+      it('does not log NOTIFY', function () {
+        const event = { query: 'NOTIFY ping' };
+        db._pgpInitOptions.query(event);
+        assert(!db.logger.debug.called);
+      });
     }); // query
     describe('receive', function () {
       it('covers', function () {
@@ -97,6 +102,13 @@ describe('Postgres Creator', function () {
         assert(db.logger.debug.called);
         assert.deepStrictEqual(data, expectedData);
       });
+      it('does not log NOTIFY', function () {
+        const data = [];
+        const result = { command: 'NOTIFY' };
+        const event = {};
+        db._pgpInitOptions.receive({ data, result, ctx: event });
+        assert(!db.logger.debug.called);
+      });
       it('covers no query logging', function () {
         delete options.db.queryLogLevel;
         db = new DatabasePostgres(stubLogger, options, stubPgp);
diff --git a/test/lib/postgres-listener.js b/test/lib/postgres-listener.js
new file mode 100644 (file)
index 0000000..0d69bd1
--- /dev/null
@@ -0,0 +1,208 @@
+/* eslint-env mocha */
+'use strict';
+
+const assert = require('node:assert');
+const sinon = require('sinon');
+const { StubLogger } = require('@squeep/test-helper');
+const Listener = require('../../lib/postgres-listener');
+
+const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+const noExpectedException = 'did not get expected exception';
+
+describe('Postgres Listener', function () {
+  let listener, options, connectionStub, pgpStub, stubLogger;
+  beforeEach(function () {
+    stubLogger = new StubLogger(sinon);
+    connectionStub = {
+      client: {
+        on: sinon.stub(),
+        removeListener: sinon.stub(),
+      },
+      done: sinon.stub(),
+      none: sinon.stub(),
+    };
+    pgpStub = {
+      connect: sinon.stub().resolves(connectionStub),
+    };
+    options = {
+      dataCallback: sinon.stub(),
+      connectionLostCallback: sinon.stub(),
+      connectionEstablishedCallback: sinon.stub(),
+      pingDelayMs: 100,
+      reconnectDelayMs: 1000,
+      reconnectTimes: 1,
+    };
+    listener = new Listener(stubLogger, pgpStub, options);
+  });
+  afterEach(function () {
+    sinon.restore();
+  });
+
+  describe('start', function () {
+    it('covers', async function () {
+      sinon.stub(listener, '_reconnect').resolves();
+      sinon.stub(listener, '_sendPing').resolves();
+      await listener.start();
+      assert(listener._reconnect.called);
+      assert(listener._sendPing.called);
+    });
+  }); // start
+
+  describe('stop', function () {
+    it('covers not started', async function () {
+      await listener.stop();
+    });
+    it('cancels pending reconnect', async function() {
+      this.slow(300);
+      const pendingReconnect = sinon.stub();
+      listener.reconnectPending = setTimeout(pendingReconnect, 100);
+      await listener.stop();
+      await snooze(110);
+      assert(!pendingReconnect.called);
+    });
+    it('cancels pending ping', async function() {
+      this.slow(300);
+      const nextPing = sinon.stub();
+      listener.nextPingTimeout = setTimeout(nextPing, 100);
+      await listener.stop();
+      await snooze(110);
+      assert(!nextPing.called);
+    });
+
+    it('closes existing connection', async function () {
+      listener.connection = connectionStub;
+      await listener.stop();
+      assert(connectionStub.client.removeListener.called);
+      assert.strictEqual(listener.connection, null);
+      assert(options.connectionLostCallback.called);
+    });
+  }); // stop
+
+  describe('_reconnect', function () {
+    it('reconnects', async function () {
+      await listener._reconnect(0, 1);
+      assert(listener.connection);
+      assert(options.connectionEstablishedCallback.called);
+    });
+    it('closes existing connection before reconnecting', async function () {
+      const existingConnection = {
+        done: sinon.stub(),
+      };
+      listener.connection = existingConnection;
+      await listener._reconnect(0, 1);
+      assert(existingConnection.done.called);
+    });
+    it('overrides a pending reconnect', async function () {
+      this.slow(300);
+      const pendingReconnect = sinon.stub();
+      listener.reconnectPending = setTimeout(pendingReconnect, 100);
+      await listener._reconnect(0, 1);
+      await snooze(110);
+      assert(!pendingReconnect.called);
+    });
+    it('fails with no remaining retries', async function () {
+      const expected = new Error('foo');
+      pgpStub.connect = sinon.stub().rejects(expected);
+      try {
+        await listener._reconnect(0, 0);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
+    it('fails all remaining retries', async function () {
+      const expected = new Error('foo');
+      pgpStub.connect = sinon.stub().rejects(expected);
+      try {
+        await listener._reconnect(0, 1);
+        assert.fail(noExpectedException);
+      } catch (e) {
+        assert.deepStrictEqual(e, expected);
+      }
+    });
+    it('fails first retry', async function () {
+      const expected = new Error('foo');
+      pgpStub.connect = sinon.stub().onCall(0).rejects(expected).resolves(connectionStub);
+      await listener._reconnect(0, 1);
+      assert(options.connectionEstablishedCallback.called);
+    });
+  }); // _reconnect
+
+  describe('_onConnectionLost', function () {
+    let error, event;
+    beforeEach(function () {
+      error = new Error('blah');
+      event = connectionStub;
+      sinon.stub(listener, '_reconnect');
+    });
+    it('success', async function () {
+      await listener._onConnectionLost(error, event);
+      assert.strictEqual(listener.connection, null);
+      assert(event.client.removeListener.called);
+      assert(listener.options.connectionLostCallback.called);
+      assert(listener._reconnect.called);
+    });
+    it('covers reconnect failure', async function () {
+      listener._reconnect.rejects(error);
+      await listener._onConnectionLost(error, event);
+      assert.strictEqual(listener.connection, null);
+      assert(event.client.removeListener.called);
+      assert(listener.options.connectionLostCallback.called);
+      assert(listener._reconnect.called);
+    });
+    it('covers listener removal failure', async function () {
+      event.client.removeListener.throws(error);
+      await listener._onConnectionLost(error, event);
+      assert.strictEqual(listener.connection, null);
+      assert(event.client.removeListener.called);
+      assert(listener.options.connectionLostCallback.called);
+      assert(listener._reconnect.called);
+    });
+  }); // _onConnectionLost
+
+  describe('_onNotification', function () {
+    it('sends data', async function () {
+      const data = {
+        payload: 'foo',
+      };
+      await listener._onNotification(data);
+      assert(listener.options.dataCallback.called);
+    });
+    it('ignores pings', async function () {
+      const data = {
+        payload: 'ping',
+      };
+      await listener._onNotification(data);
+      assert(!listener.options.dataCallback.called);
+    });
+  }); // _onNotification
+
+  describe('_sendPing', function () {
+    it('covers no connection', async function () {
+      this.slow(300);
+      await listener._sendPing();
+      await snooze(110);
+      clearTimeout(listener.nextPingTimeout);
+    });
+    it('success', async function () {
+      this.slow(300);
+      listener.connection = connectionStub;
+      await listener._sendPing();
+      await snooze(110);
+      clearTimeout(listener.nextPingTimeout);
+      assert(connectionStub.none.called);
+    });
+    it('covers error', async function () {
+      const err = new Error('blah');
+      this.slow(300);
+      listener.connection = connectionStub;
+      listener.connection.none.rejects(err);
+      await listener._sendPing();
+      await snooze(110);
+      clearTimeout(listener.nextPingTimeout);
+      assert(listener.connection.none.called);
+
+    });
+  }); // _sendPing
+
+}); // Postgres Listener