3 const pgpInitOptions
= {
7 const path
= require('path');
8 const pgp
= require('pg-promise')(pgpInitOptions
);
9 const BaseDatabase
= require('../base');
10 const common
= require('../../common');
11 const DBErrors
= require('../errors');
13 const _fileScope
= common
.fileScope(__filename
);
15 const defaultOptions
= {
16 connectionString: undefined,
17 queryLogLevel: undefined,
18 listenerPingDelayMs: 2000,
19 listenerRetryDelayMs: 5000,
20 listenerRetryTimes: 10,
21 listenerChannel: 'cache_invalidation',
22 listenerCallback: () => {},
23 listenerLostCallback: () => {},
26 class PostgresDatabase
extends BaseDatabase
{
27 constructor(logger
, options
, _pgp
= pgp
) {
28 const _scope
= _fileScope('constructor');
31 common
.setOptions(this, defaultOptions
, options
);
33 this.logger
.debug(_scope
, 'connecting', { connectionString: this.connectionString
});
35 this.db
= _pgp(this.connectionString
);
37 if (this.queryLogLevel
) {
38 pgpInitOptions
.query
= (e
) => {
39 this.logger
[this.queryLogLevel
](_fileScope('query'), e
.query
, { params: e
.params
});
43 pgpInitOptions
.error
= (err
, e
) => {
44 this.logger
[this.queryLogLevel
](_fileScope('pgp'), '', { err
, e
});
47 pgpInitOptions
.receive
= (data
, result
, e
) => {
48 const exemplaryRow
= data
[0];
49 for (const prop
in exemplaryRow
) {
50 const camel
= BaseDatabase
._camelfy(prop
);
51 if (!(camel
in exemplaryRow
)) {
52 for (const d
of data
) {
53 // eslint-disable-next-line security/detect-object-injection
55 // eslint-disable-next-line security/detect-object-injection
61 if (this.queryLogLevel
) {
62 this.logger
[this.queryLogLevel
](_fileScope('result'), e
.query
, PostgresDatabase
._resultLog(result
));
65 this._initStatements(_pgp
);
68 static _resultLog(result
) {
70 command: result
.commaand
,
71 rowCount: result
.rowCount
,
72 duration: result
.duration
,
76 _initStatements(_pgp
) {
77 const _scope
= _fileScope('_initStatements');
82 this.statement
= _pgp
.utils
.enumSql(path
.join(__dirname
, 'sql'), {}, (file
) => {
83 const qf
= new _pgp
.QueryFile(file
, qfOptions
);
84 this.logger
.debug(_scope
, file
, { file: qf
.file
, options: qf
.options
, query: qf
.query
});
86 this.logger
.error(_scope
, file
, { error: qf
.error
});
92 this.logger
.debug(_scope
, 'complete', { statementCount: Object
.keys(this.statement
).length
});
96 async
_initListener() {
97 await
this._reconnectListener(0, 1);
98 this._sendListenerNotifications();
100 _sendListenerNotifications() {
101 const _scope
= _fileScope('_sendListenerNotifications');
102 setTimeout(async () => {
103 if (this.listenerConnection
) {
105 await
this.listenerConnection
.none('NOTIFY $1~, $2', [this.listenerChannel
, 'ping']);
107 this.logger
.error(_scope
, 'failed', e
);
109 this._sendListenerNotifications();
112 }, this.listenerPingDelayMs
);
114 async
_onListenerConnectionLost(err
, ev
) {
115 const _scope
= _fileScope('_onConnectionLost');
116 const eventName
= 'notification';
117 this.listenerConnection
= null;
118 this.logger
.error(_scope
, 'listener connection lost', { err
, ev
});
119 ev
.client
.removeListener(eventName
, this._onListenerNotificationBound
);
120 await
this.listenerLostCallback();
121 await
this._reconnectListener(this.listenerRetryDelayMs
, this.listenerRetryTimes
);
122 this.logger
.debug(_scope
, 'listener reconnected');
124 async
_onListenerNotification(data
) {
125 return await
this.listenerCallback(data
.payload
);
127 async
_reconnectListener(delay
, retriesRemaining
) {
128 const _scope
= _fileScope('_reconnectListener');
129 const eventName
= 'notification';
130 if (this.listenerConnection
) {
131 this.listenerConnection
.done();
132 this.listenerConnection
= null;
134 if (this.listenerReconnectPending
) {
135 clearTimeout(this.listenerReconnectPending
);
136 delete this.listenerReconnectPending
;
138 return new Promise((resolve
, reject
) => {
139 this.listenerReconnectPending
= setTimeout(async () => {
141 this.listenerConnection
= await
this.db
.connect({
143 onLost: this._onListenerConnectionLost
.bind(this),
145 if (!this._onListenerNotificationBound
) {
146 this._onListenerNotificationBound
= this._onListenerNotification
.bind(this);
148 this.listenerConnection
.client
.on(eventName
, this._onListenerNotificationBound
);
149 await
this.listenerConnection
.none('LISTEN $1~', this.listenerChannel
);
150 delete this.listenerReconnectPending
;
151 this.logger
.debug(_scope
, 'listener connection established');
154 if (retriesRemaining
> 0) {
156 await
this._reconnectListener(delay
, retriesRemaining
- 1);
170 // eslint-disable-next-line class-methods-use-this
171 _postgresInfo(result
) {
173 changes: result
.rowCount
,
174 lastInsertRowid: result
.rows
.length
? result
.rows
[0].id : undefined,
175 duration: result
.duration
,
180 return await
this.db
.task(async (t
) => await
fn(t
));
184 async
transaction(dbCtx
, fn
) {
185 dbCtx
= dbCtx
|| this.db
;
186 return await dbCtx
.txIf(async (t
) => await
fn(t
));
190 async
getAuthById(dbCtx
, id
) {
191 const _scope
= _fileScope('getAuthById');
192 this.logger
.debug(_scope
, 'called', { id
});
194 dbCtx
= dbCtx
|| this.db
;
196 const auth
= await dbCtx
.oneOrNone(this.statement
.authGetById
, { id
});
197 this.logger
.debug(_scope
, 'get', { auth
});
202 static _epochFix(epoch
) {
205 return Number
.MAX_SAFE_INTEGER
;
216 static _linkToNative(link
) {
219 created: PostgresDatabase
._epochFix(link
.created
),
220 lastAccess: PostgresDatabase
._epochFix(link
.lastAccess
),
221 expires: PostgresDatabase
._epochFix(link
.expires
),
225 async
insertLink(dbCtx
, id
, url
, authToken
) {
226 const _scope
= _fileScope('insertLink');
227 this.logger
.debug(_scope
, 'called', { id
, url
, authToken
});
229 dbCtx
= dbCtx
|| this.db
;
231 const result
= await dbCtx
.result(this.statement
.linkUpsert
, { id
, url
, authToken
});
232 this.logger
.debug(_scope
, 'result', PostgresDatabase
._resultLog(result
) );
233 return this._postgresInfo(result
);
237 async
getLinkById(dbCtx
, id
) {
238 const _scope
= _fileScope('getLinkById');
239 this.logger
.debug(_scope
, 'called', { id
});
241 dbCtx
= dbCtx
|| this.db
;
243 const link
= await dbCtx
.oneOrNone(this.statement
.linkGetById
, { id
});
244 this.logger
.debug(_scope
, 'get', { link
});
245 return PostgresDatabase
._linkToNative(link
);
249 async
getLinkByUrl(dbCtx
, url
) {
250 const _scope
= _fileScope('getLinkByUrl');
251 this.logger
.debug(_scope
, 'called', { url
});
253 dbCtx
= dbCtx
|| this.db
;
255 const link
= await dbCtx
.oneOrNone(this.statement
.linkGetByUrl
, { url
});
256 this.logger
.debug(_scope
, 'get', { link
});
257 return PostgresDatabase
._linkToNative(link
);
261 async
accessLink(dbCtx
, id
) {
262 const _scope
= _fileScope('accessLink');
263 this.logger
.debug(_scope
, 'called', { id
});
265 dbCtx
= dbCtx
|| this.db
;
267 const link
= await dbCtx
.oneOrNone(this.statement
.linkAccess
, { id
});
268 this.logger
.debug(_scope
, 'get', { link
});
269 return PostgresDatabase
._linkToNative(link
);
273 async
expireLink(dbCtx
, id
, expires
) {
274 const _scope
= _fileScope('expireLink');
275 this.logger
.debug(_scope
, 'called', { id
, expires
});
277 dbCtx
= dbCtx
|| this.db
;
279 const result
= await dbCtx
.result(this.statement
.linkExpire
, { expires
, id
});
280 this.logger
.debug(_scope
, 'result', PostgresDatabase
._resultLog(result
) );
281 return this._postgresInfo(result
);
285 async
updateLink(dbCtx
, id
, url
) {
286 const _scope
= _fileScope('updateLink');
287 this.logger
.debug(_scope
, 'called', { id
, url
});
289 dbCtx
= dbCtx
|| this.db
;
291 const result
= await dbCtx
.result(this.statement
.linkUpdate
, { id
, url
});
292 this.logger
.debug(_scope
, 'result', PostgresDatabase
._resultLog(result
) );
293 return this._postgresInfo(result
);
297 async
getAllLinks(dbCtx
) {
298 const _scope
= _fileScope('getAllLinks');
299 this.logger
.debug(_scope
, 'called', { });
301 dbCtx
= dbCtx
|| this.db
;
303 const links
= await dbCtx
.manyOrNone(this.statement
.linkGetAll
, { });
304 this.logger
.debug(_scope
, 'get', { links
});
305 return links
.map((l
) => PostgresDatabase
._linkToNative(l
));
311 module
.exports
= PostgresDatabase
;