Merge branch 'v1.3-dev'
[websub-hub] / src / db / postgres / listener.js
1 'use strict';
2
3 const common = require('../../common');
4
5 const _fileScope = common.fileScope(__filename);
6
7
8 const defaultOptions = {
9 channel: 'cache_invalidation',
10 dataCallback: common.nop,
11 connectionLostCallback: common.nop,
12 connectionEstablishedCallback: common.nop,
13 pingDelayMs: 5000,
14 reconnectDelayMs: 6000,
15 reconnectTimes: 10,
16 };
17
18 /**
19 * Create a robust connection which listens to a notification channel.
20 */
21 class PostgresListener {
22 constructor(logger, db, options) {
23 this.logger = logger;
24 this.db = db;
25
26 this.options = {
27 ...defaultOptions,
28 ...options,
29 };
30 this.notificationEventName = 'notification';
31
32 this.connection = null;
33 this.nextPingTimeout = undefined;
34
35 this._onConnectionLostBound = this._onConnectionLost.bind(this);
36 this._onNotificationBound = this._onNotification.bind(this);
37 }
38
39
40 /**
41 * Establish the listener connection.
42 */
43 async start() {
44 await this._reconnect(0, 1);
45 this._sendPing();
46 }
47
48
49 /**
50 * Shut down the listener connection.
51 */
52 async stop() {
53 const _scope = _fileScope('stop');
54 if (this.reconnectPending) {
55 this.logger.debug(_scope, 'overriding existing reconnect retry');
56 clearTimeout(this.reconnectPending);
57 delete this.reconnectPending;
58 }
59 if (this.connection) {
60 this.connection.client.removeListener(this.notificationEventName, this._onNotificationBound);
61 this.connection.done();
62 this.connection = null;
63 await this.options.connectionLostCallback();
64 }
65 }
66
67
68 /**
69 * Begin sending connection pings.
70 */
71 _sendPing() {
72 const _scope = _fileScope('_sendPing');
73 this.nextPingTimeout = setTimeout(async () => {
74 try {
75 if (this.connection) {
76 await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' });
77 }
78 } catch (e) {
79 this.logger.error(_scope, 'failed', { error: e });
80 } finally {
81 this._sendPing();
82 }
83 }, this.options.pingDelayMs);
84 }
85
86
87 /**
88 * Notify callback.
89 * @param {object} data listener notification
90 */
91 async _onNotification(data) {
92 const _scope = _fileScope('_onNotification');
93 // Ignore our own messages
94 if (data.payload === 'ping') {
95 return;
96 }
97 this.logger.debug(_scope, 'called', data);
98 await this.options.dataCallback(data.payload);
99 }
100
101
102 /**
103 * Notify callback and attempt to reconnect.
104 * @param {*} error error
105 * @param {*} event event
106 */
107 async _onConnectionLost(error, event) {
108 const _scope = _fileScope('_onConnectionLost');
109 this.logger.error(_scope, 'listener connection lost', { error, event });
110 this.connection = null;
111 try {
112 event.client.removeListener(this.notificationEventName, this._onNotificationBound);
113 } catch (e) {
114 this.logger.error(_scope, 'failed to remove listener', { error: e });
115 // That's okay, it was probably just gone anyhow.
116 }
117 await this.options.connectionLostCallback();
118 try {
119 await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes);
120 } catch (e) {
121 this.logger.error(_scope, 'failed to reconnect listener', { error: e });
122 }
123 }
124
125
126 /**
127 * Schedule an attempt to establish a connection.
128 * @param {number} delay reconnect delay
129 * @param {number} retriesRemaining retry countdown
130 */
131 async _reconnect(delay, retriesRemaining) {
132 const _scope = _fileScope('_reconnect');
133 if (this.connection) {
134 this.logger.debug(_scope, 'closing existing connection');
135 this.connection.done();
136 this.connection = null;
137 }
138 if (this.reconnectPending) {
139 this.logger.debug(_scope, 'overriding existing reconnect retry');
140 clearTimeout(this.reconnectPending);
141 }
142 return new Promise((resolve, reject) => {
143 this.reconnectPending = setTimeout(async () => {
144 try {
145 delete this.reconnectPending;
146 this.connection = await this.db.connect({
147 direct: true,
148 onLost: this._onConnectionLostBound,
149 });
150 this.connection.client.on(this.notificationEventName, this._onNotificationBound);
151 await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel });
152 this.logger.debug(_scope, 'listener connection established');
153 await this.options.connectionEstablishedCallback();
154 resolve();
155 } catch (e) {
156 if (retriesRemaining <= 0) {
157 return reject(e);
158 }
159 try {
160 await this._reconnect(delay, retriesRemaining - 1);
161 resolve();
162 } catch (e2) {
163 reject(e2);
164 }
165 }
166 }, delay);
167 });
168 }
169
170 }
171
172 module.exports = PostgresListener;