add caching for topic content db calls (Postgres only)
[websub-hub] / src / db / postgres / listener.js
diff --git a/src/db/postgres/listener.js b/src/db/postgres/listener.js
new file mode 100644 (file)
index 0000000..6ad387a
--- /dev/null
@@ -0,0 +1,169 @@
+'use strict';
+
+const common = require('../../common');
+
+const _fileScope = common.fileScope(__filename);
+
+
+const defaultOptions = {
+  channel: 'cache_invalidation',
+  dataCallback: common.nop,
+  connectionLostCallback: common.nop,
+  connectionEstablishedCallback: common.nop,
+  pingDelayMs: 5000,
+  reconnectDelayMs: 6000,
+  reconnectTimes: 10,
+};
+
+/**
+ * Create a robust connection which listens to a notification channel.
+ */
+class PostgresListener {
+  constructor(logger, db, options) {
+    this.logger = logger;
+    this.db = db;
+
+    this.options = Object.assign({}, defaultOptions, options);
+    this.notificationEventName = 'notification';
+
+    this.connection = null;
+    this.nextPingTimeout = undefined;
+
+    this._onConnectionLostBound = this._onConnectionLost.bind(this);
+    this._onNotificationBound = this._onNotification.bind(this);
+  }
+
+
+  /**
+   * Establish the listener connection.
+   */
+  async start() {
+    await this._reconnect(0, 1);
+    this._sendPing();
+  }
+
+
+  /**
+   * Shut down the listener connection.
+   */
+  async stop() {
+    const _scope = _fileScope('stop');
+    if (this.reconnectPending) {
+      this.logger.debug(_scope, 'overriding existing reconnect retry');
+      clearTimeout(this.reconnectPending);
+      delete this.reconnectPending;
+    }
+    if (this.connection) {
+      this.connection.client.removeListener(this.notificationEventName, this.onNotificationBound);
+      this.connection.done();
+      this.connection = null;
+      await this.options.connectionLostCallback();
+    }
+  }
+
+
+  /**
+   * Begin sending connection pings.
+   */
+  _sendPing() {
+    const _scope = _fileScope('_sendPing');
+    this.nextPingTimeout = setTimeout(async () => {
+      try {
+        if (this.connection) {
+          await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' });
+        }
+      } catch (e) {
+        this.logger.error(_scope, 'failed', { error: e });
+      } finally {
+        this._sendPing();
+      }
+    }, this.options.pingDelayMs);
+  }
+
+
+  /**
+   * Notify callback.
+   * @param {Object} data
+   */
+  async _onNotification(data) {
+    const _scope = _fileScope('_onNotification');
+    // Ignore our own messages
+    if (data.payload === 'ping') {
+      return;
+    }
+    this.logger.debug(_scope, 'called', data);
+    await this.options.dataCallback(data.payload);
+  }
+
+
+  /**
+   * Notify callback and attempt to reconnect.
+   * @param {*} error
+   * @param {*} event
+   */
+  async _onConnectionLost(error, event) {
+    const _scope = _fileScope('_onConnectionLost');
+    this.logger.error(_scope, 'listener connection lost', { error, event });
+    this.connection = null;
+    try {
+      event.client.removeListener(this.notificationEventName, this.onNotificationBound);
+    } catch (e) {
+      this.logger.error(_scope, 'failed to remove listener', { error: e });
+      // That's okay, it was probably just gone anyhow.
+    }
+    await this.options.connectionLostCallback();
+    try {
+      await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes);
+    } catch (e) {
+      this.logger.error(_scope, 'failed to reconnect listener', { error: e });
+    }
+  }
+
+
+  /**
+   * Schedule an attempt to establish a connection.
+   * @param {Number} delay
+   * @param {Number} retriesRemaining
+   */
+  async _reconnect(delay, retriesRemaining) {
+    const _scope = _fileScope('_reconnect');
+    if (this.connection) {
+      this.logger.debug(_scope, 'closing existing connection');
+      this.connection.done();
+      this.connection = null;
+    }
+    if (this.reconnectPending) {
+      this.logger.debug(_scope, 'overriding existing reconnect retry');
+      clearTimeout(this.reconnectPending);
+    }
+    return new Promise((resolve, reject) => {
+      this.reconnectPending = setTimeout(async () => {
+        try {
+          delete this.reconnectPending;
+          this.connection = await this.db.connect({
+            direct: true,
+            onLost: this._onConnectionLostBound,
+          });
+          this.connection.client.on(this.notificationEventName, this._onNotificationBound);
+          await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel });
+          this.logger.debug(_scope, 'listener connection established');
+          await this.options.connectionEstablishedCallback();
+          resolve();
+        } catch (e) {
+          if (retriesRemaining <= 0) {
+            return reject(e);
+          }
+          try {
+            await this._reconnect(delay, retriesRemaining - 1);
+            resolve();
+          } catch (e2) {
+            reject(e2);
+          }
+        }
+      }, delay);
+    });
+  }
+
+}
+
+module.exports = PostgresListener;
\ No newline at end of file