6ad387aa49ea2e11a91e29a6de60094eea4024aa
[websub-hub] / src / db / postgres / listener.js
1 'use strict';
2
3 const common = require('../../common');
4
5 const _fileScope = common.fileScope(__filename);
6
7
8 const defaultOptions = {
9 channel: 'cache_invalidation',
10 dataCallback: common.nop,
11 connectionLostCallback: common.nop,
12 connectionEstablishedCallback: common.nop,
13 pingDelayMs: 5000,
14 reconnectDelayMs: 6000,
15 reconnectTimes: 10,
16 };
17
18 /**
19 * Create a robust connection which listens to a notification channel.
20 */
21 class PostgresListener {
22 constructor(logger, db, options) {
23 this.logger = logger;
24 this.db = db;
25
26 this.options = Object.assign({}, defaultOptions, options);
27 this.notificationEventName = 'notification';
28
29 this.connection = null;
30 this.nextPingTimeout = undefined;
31
32 this._onConnectionLostBound = this._onConnectionLost.bind(this);
33 this._onNotificationBound = this._onNotification.bind(this);
34 }
35
36
37 /**
38 * Establish the listener connection.
39 */
40 async start() {
41 await this._reconnect(0, 1);
42 this._sendPing();
43 }
44
45
46 /**
47 * Shut down the listener connection.
48 */
49 async stop() {
50 const _scope = _fileScope('stop');
51 if (this.reconnectPending) {
52 this.logger.debug(_scope, 'overriding existing reconnect retry');
53 clearTimeout(this.reconnectPending);
54 delete this.reconnectPending;
55 }
56 if (this.connection) {
57 this.connection.client.removeListener(this.notificationEventName, this.onNotificationBound);
58 this.connection.done();
59 this.connection = null;
60 await this.options.connectionLostCallback();
61 }
62 }
63
64
65 /**
66 * Begin sending connection pings.
67 */
68 _sendPing() {
69 const _scope = _fileScope('_sendPing');
70 this.nextPingTimeout = setTimeout(async () => {
71 try {
72 if (this.connection) {
73 await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' });
74 }
75 } catch (e) {
76 this.logger.error(_scope, 'failed', { error: e });
77 } finally {
78 this._sendPing();
79 }
80 }, this.options.pingDelayMs);
81 }
82
83
84 /**
85 * Notify callback.
86 * @param {Object} data
87 */
88 async _onNotification(data) {
89 const _scope = _fileScope('_onNotification');
90 // Ignore our own messages
91 if (data.payload === 'ping') {
92 return;
93 }
94 this.logger.debug(_scope, 'called', data);
95 await this.options.dataCallback(data.payload);
96 }
97
98
99 /**
100 * Notify callback and attempt to reconnect.
101 * @param {*} error
102 * @param {*} event
103 */
104 async _onConnectionLost(error, event) {
105 const _scope = _fileScope('_onConnectionLost');
106 this.logger.error(_scope, 'listener connection lost', { error, event });
107 this.connection = null;
108 try {
109 event.client.removeListener(this.notificationEventName, this.onNotificationBound);
110 } catch (e) {
111 this.logger.error(_scope, 'failed to remove listener', { error: e });
112 // That's okay, it was probably just gone anyhow.
113 }
114 await this.options.connectionLostCallback();
115 try {
116 await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes);
117 } catch (e) {
118 this.logger.error(_scope, 'failed to reconnect listener', { error: e });
119 }
120 }
121
122
123 /**
124 * Schedule an attempt to establish a connection.
125 * @param {Number} delay
126 * @param {Number} retriesRemaining
127 */
128 async _reconnect(delay, retriesRemaining) {
129 const _scope = _fileScope('_reconnect');
130 if (this.connection) {
131 this.logger.debug(_scope, 'closing existing connection');
132 this.connection.done();
133 this.connection = null;
134 }
135 if (this.reconnectPending) {
136 this.logger.debug(_scope, 'overriding existing reconnect retry');
137 clearTimeout(this.reconnectPending);
138 }
139 return new Promise((resolve, reject) => {
140 this.reconnectPending = setTimeout(async () => {
141 try {
142 delete this.reconnectPending;
143 this.connection = await this.db.connect({
144 direct: true,
145 onLost: this._onConnectionLostBound,
146 });
147 this.connection.client.on(this.notificationEventName, this._onNotificationBound);
148 await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel });
149 this.logger.debug(_scope, 'listener connection established');
150 await this.options.connectionEstablishedCallback();
151 resolve();
152 } catch (e) {
153 if (retriesRemaining <= 0) {
154 return reject(e);
155 }
156 try {
157 await this._reconnect(delay, retriesRemaining - 1);
158 resolve();
159 } catch (e2) {
160 reject(e2);
161 }
162 }
163 }, delay);
164 });
165 }
166
167 }
168
169 module.exports = PostgresListener;