3 const common
= require('../../common');
5 const _fileScope
= common
.fileScope(__filename
);
8 const defaultOptions
= {
9 channel: 'cache_invalidation',
10 dataCallback: common
.nop
,
11 connectionLostCallback: common
.nop
,
12 connectionEstablishedCallback: common
.nop
,
14 reconnectDelayMs: 6000,
19 * Create a robust connection which listens to a notification channel.
21 class PostgresListener
{
22 constructor(logger
, db
, options
) {
26 this.options
= Object
.assign({}, defaultOptions
, options
);
27 this.notificationEventName
= 'notification';
29 this.connection
= null;
30 this.nextPingTimeout
= undefined;
32 this._onConnectionLostBound
= this._onConnectionLost
.bind(this);
33 this._onNotificationBound
= this._onNotification
.bind(this);
38 * Establish the listener connection.
41 await
this._reconnect(0, 1);
47 * Shut down the listener connection.
50 const _scope
= _fileScope('stop');
51 if (this.reconnectPending
) {
52 this.logger
.debug(_scope
, 'overriding existing reconnect retry');
53 clearTimeout(this.reconnectPending
);
54 delete this.reconnectPending
;
56 if (this.connection
) {
57 this.connection
.client
.removeListener(this.notificationEventName
, this.onNotificationBound
);
58 this.connection
.done();
59 this.connection
= null;
60 await
this.options
.connectionLostCallback();
66 * Begin sending connection pings.
69 const _scope
= _fileScope('_sendPing');
70 this.nextPingTimeout
= setTimeout(async () => {
72 if (this.connection
) {
73 await
this.connection
.none('NOTIFY $(channel:name), $(payload)', { channel: this.options
.channel
, payload: 'ping' });
76 this.logger
.error(_scope
, 'failed', { error: e
});
80 }, this.options
.pingDelayMs
);
86 * @param {Object} data
88 async
_onNotification(data
) {
89 const _scope
= _fileScope('_onNotification');
90 // Ignore our own messages
91 if (data
.payload
=== 'ping') {
94 this.logger
.debug(_scope
, 'called', data
);
95 await
this.options
.dataCallback(data
.payload
);
100 * Notify callback and attempt to reconnect.
104 async
_onConnectionLost(error
, event
) {
105 const _scope
= _fileScope('_onConnectionLost');
106 this.logger
.error(_scope
, 'listener connection lost', { error
, event
});
107 this.connection
= null;
109 event
.client
.removeListener(this.notificationEventName
, this.onNotificationBound
);
111 this.logger
.error(_scope
, 'failed to remove listener', { error: e
});
112 // That's okay, it was probably just gone anyhow.
114 await
this.options
.connectionLostCallback();
116 await
this._reconnect(this.options
.reconnectDelayMs
, this.options
.reconnectTimes
);
118 this.logger
.error(_scope
, 'failed to reconnect listener', { error: e
});
124 * Schedule an attempt to establish a connection.
125 * @param {Number} delay
126 * @param {Number} retriesRemaining
128 async
_reconnect(delay
, retriesRemaining
) {
129 const _scope
= _fileScope('_reconnect');
130 if (this.connection
) {
131 this.logger
.debug(_scope
, 'closing existing connection');
132 this.connection
.done();
133 this.connection
= null;
135 if (this.reconnectPending
) {
136 this.logger
.debug(_scope
, 'overriding existing reconnect retry');
137 clearTimeout(this.reconnectPending
);
139 return new Promise((resolve
, reject
) => {
140 this.reconnectPending
= setTimeout(async () => {
142 delete this.reconnectPending
;
143 this.connection
= await
this.db
.connect({
145 onLost: this._onConnectionLostBound
,
147 this.connection
.client
.on(this.notificationEventName
, this._onNotificationBound
);
148 await
this.connection
.none('LISTEN $(channel:name)', { channel: this.options
.channel
});
149 this.logger
.debug(_scope
, 'listener connection established');
150 await
this.options
.connectionEstablishedCallback();
153 if (retriesRemaining
<= 0) {
157 await
this._reconnect(delay
, retriesRemaining
- 1);
169 module
.exports
= PostgresListener
;