3 const common
= require('../../common');
5 const _fileScope
= common
.fileScope(__filename
);
8 const defaultOptions
= {
9 channel: 'cache_invalidation',
10 dataCallback: common
.nop
,
11 connectionLostCallback: common
.nop
,
12 connectionEstablishedCallback: common
.nop
,
14 reconnectDelayMs: 6000,
19 * Create a robust connection which listens to a notification channel.
21 class PostgresListener
{
22 constructor(logger
, db
, options
) {
30 this.notificationEventName
= 'notification';
32 this.connection
= null;
33 this.nextPingTimeout
= undefined;
35 this._onConnectionLostBound
= this._onConnectionLost
.bind(this);
36 this._onNotificationBound
= this._onNotification
.bind(this);
41 * Establish the listener connection.
44 await
this._reconnect(0, 1);
50 * Shut down the listener connection.
53 const _scope
= _fileScope('stop');
54 if (this.reconnectPending
) {
55 this.logger
.debug(_scope
, 'overriding existing reconnect retry');
56 clearTimeout(this.reconnectPending
);
57 delete this.reconnectPending
;
59 if (this.connection
) {
60 this.connection
.client
.removeListener(this.notificationEventName
, this._onNotificationBound
);
61 this.connection
.done();
62 this.connection
= null;
63 await
this.options
.connectionLostCallback();
69 * Begin sending connection pings.
72 const _scope
= _fileScope('_sendPing');
73 this.nextPingTimeout
= setTimeout(async () => {
75 if (this.connection
) {
76 await
this.connection
.none('NOTIFY $(channel:name), $(payload)', { channel: this.options
.channel
, payload: 'ping' });
79 this.logger
.error(_scope
, 'failed', { error: e
});
83 }, this.options
.pingDelayMs
);
89 * @param {object} data listener notification
91 async
_onNotification(data
) {
92 const _scope
= _fileScope('_onNotification');
93 // Ignore our own messages
94 if (data
.payload
=== 'ping') {
97 this.logger
.debug(_scope
, 'called', data
);
98 await
this.options
.dataCallback(data
.payload
);
103 * Notify callback and attempt to reconnect.
104 * @param {*} error error
105 * @param {*} event event
107 async
_onConnectionLost(error
, event
) {
108 const _scope
= _fileScope('_onConnectionLost');
109 this.logger
.error(_scope
, 'listener connection lost', { error
, event
});
110 this.connection
= null;
112 event
.client
.removeListener(this.notificationEventName
, this._onNotificationBound
);
114 this.logger
.error(_scope
, 'failed to remove listener', { error: e
});
115 // That's okay, it was probably just gone anyhow.
117 await
this.options
.connectionLostCallback();
119 await
this._reconnect(this.options
.reconnectDelayMs
, this.options
.reconnectTimes
);
121 this.logger
.error(_scope
, 'failed to reconnect listener', { error: e
});
127 * Schedule an attempt to establish a connection.
128 * @param {number} delay reconnect delay
129 * @param {number} retriesRemaining retry countdown
131 async
_reconnect(delay
, retriesRemaining
) {
132 const _scope
= _fileScope('_reconnect');
133 if (this.connection
) {
134 this.logger
.debug(_scope
, 'closing existing connection');
135 this.connection
.done();
136 this.connection
= null;
138 if (this.reconnectPending
) {
139 this.logger
.debug(_scope
, 'overriding existing reconnect retry');
140 clearTimeout(this.reconnectPending
);
142 return new Promise((resolve
, reject
) => {
143 this.reconnectPending
= setTimeout(async () => {
145 delete this.reconnectPending
;
146 this.connection
= await
this.db
.connect({
148 onLost: this._onConnectionLostBound
,
150 this.connection
.client
.on(this.notificationEventName
, this._onNotificationBound
);
151 await
this.connection
.none('LISTEN $(channel:name)', { channel: this.options
.channel
});
152 this.logger
.debug(_scope
, 'listener connection established');
153 await
this.options
.connectionEstablishedCallback();
156 if (retriesRemaining
<= 0) {
160 await
this._reconnect(delay
, retriesRemaining
- 1);
172 module
.exports
= PostgresListener
;