initial commit
[urlittler] / src / db / postgres / index.js
1 'use strict';
2
3 const pgpInitOptions = {
4 capSQL: true,
5 };
6
7 const path = require('path');
8 const pgp = require('pg-promise')(pgpInitOptions);
9 const BaseDatabase = require('../base');
10 const common = require('../../common');
11 const DBErrors = require('../errors');
12
13 const _fileScope = common.fileScope(__filename);
14
15 const defaultOptions = {
16 connectionString: undefined,
17 queryLogLevel: undefined,
18 listenerPingDelayMs: 2000,
19 listenerRetryDelayMs: 5000,
20 listenerRetryTimes: 10,
21 listenerChannel: 'cache_invalidation',
22 listenerCallback: () => {},
23 listenerLostCallback: () => {},
24 };
25
26 class PostgresDatabase extends BaseDatabase {
27 constructor(logger, options, _pgp = pgp) {
28 const _scope = _fileScope('constructor');
29
30 super(logger);
31 common.setOptions(this, defaultOptions, options);
32
33 this.logger.debug(_scope, 'connecting', { connectionString: this.connectionString });
34
35 this.db = _pgp(this.connectionString);
36
37 if (this.queryLogLevel) {
38 pgpInitOptions.query = (e) => {
39 this.logger[this.queryLogLevel](_fileScope('query'), e.query, { params: e.params });
40 };
41 }
42
43 pgpInitOptions.error = (err, e) => {
44 this.logger[this.queryLogLevel](_fileScope('pgp'), '', { err, e });
45 };
46
47 pgpInitOptions.receive = (data, result, e) => {
48 const exemplaryRow = data[0];
49 for (const prop in exemplaryRow) {
50 const camel = BaseDatabase._camelfy(prop);
51 if (!(camel in exemplaryRow)) {
52 for (const d of data) {
53 // eslint-disable-next-line security/detect-object-injection
54 d[camel] = d[prop];
55 // eslint-disable-next-line security/detect-object-injection
56 delete d[prop];
57 }
58 }
59 }
60
61 if (this.queryLogLevel) {
62 this.logger[this.queryLogLevel](_fileScope('result'), e.query, PostgresDatabase._resultLog(result));
63 }
64 };
65 this._initStatements(_pgp);
66 }
67
68 static _resultLog(result) {
69 return {
70 command: result.commaand,
71 rowCount: result.rowCount,
72 duration: result.duration,
73 };
74 }
75
76 _initStatements(_pgp) {
77 const _scope = _fileScope('_initStatements');
78
79 const qfOptions = {
80 minify: true,
81 };
82 this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, (file) => {
83 const qf = new _pgp.QueryFile(file, qfOptions);
84 this.logger.debug(_scope, file, { file: qf.file, options: qf.options, query: qf.query });
85 if (qf.error) {
86 this.logger.error(_scope, file, { error: qf.error });
87 throw qf.error;
88 }
89 return qf;
90 });
91
92 this.logger.debug(_scope, 'complete', { statementCount: Object.keys(this.statement).length });
93 }
94
95 // WIP not used yet
96 async _initListener() {
97 await this._reconnectListener(0, 1);
98 this._sendListenerNotifications();
99 }
100 _sendListenerNotifications() {
101 const _scope = _fileScope('_sendListenerNotifications');
102 setTimeout(async () => {
103 if (this.listenerConnection) {
104 try {
105 await this.listenerConnection.none('NOTIFY $1~, $2', [this.listenerChannel, 'ping']);
106 } catch (e) {
107 this.logger.error(_scope, 'failed', e);
108 } finally {
109 this._sendListenerNotifications();
110 }
111 }
112 }, this.listenerPingDelayMs);
113 }
114 async _onListenerConnectionLost(err, ev) {
115 const _scope = _fileScope('_onConnectionLost');
116 const eventName = 'notification';
117 this.listenerConnection = null;
118 this.logger.error(_scope, 'listener connection lost', { err, ev });
119 ev.client.removeListener(eventName, this._onListenerNotificationBound);
120 await this.listenerLostCallback();
121 await this._reconnectListener(this.listenerRetryDelayMs, this.listenerRetryTimes);
122 this.logger.debug(_scope, 'listener reconnected');
123 }
124 async _onListenerNotification(data) {
125 return await this.listenerCallback(data.payload);
126 }
127 async _reconnectListener(delay, retriesRemaining) {
128 const _scope = _fileScope('_reconnectListener');
129 const eventName = 'notification';
130 if (this.listenerConnection) {
131 this.listenerConnection.done();
132 this.listenerConnection = null;
133 }
134 if (this.listenerReconnectPending) {
135 clearTimeout(this.listenerReconnectPending);
136 delete this.listenerReconnectPending;
137 }
138 return new Promise((resolve, reject) => {
139 this.listenerReconnectPending = setTimeout(async () => {
140 try {
141 this.listenerConnection = await this.db.connect({
142 direct: true,
143 onLost: this._onListenerConnectionLost.bind(this),
144 });
145 if (!this._onListenerNotificationBound) {
146 this._onListenerNotificationBound = this._onListenerNotification.bind(this);
147 }
148 this.listenerConnection.client.on(eventName, this._onListenerNotificationBound);
149 await this.listenerConnection.none('LISTEN $1~', this.listenerChannel);
150 delete this.listenerReconnectPending;
151 this.logger.debug(_scope, 'listener connection established');
152 resolve();
153 } catch (e) {
154 if (retriesRemaining > 0) {
155 try {
156 await this._reconnectListener(delay, retriesRemaining - 1);
157 resolve();
158 } catch (e2) {
159 reject(e2);
160 }
161 } else {
162 reject(e);
163 }
164 }
165 }, delay);
166 });
167 }
168
169
170 // eslint-disable-next-line class-methods-use-this
171 _postgresInfo(result) {
172 return {
173 changes: result.rowCount,
174 lastInsertRowid: result.rows.length ? result.rows[0].id : undefined,
175 duration: result.duration,
176 };
177 }
178
179 async context(fn) {
180 return await this.db.task(async (t) => await fn(t));
181 }
182
183
184 async transaction(dbCtx, fn) {
185 dbCtx = dbCtx || this.db;
186 return await dbCtx.txIf(async (t) => await fn(t));
187 }
188
189
190 async getAuthById(dbCtx, id) {
191 const _scope = _fileScope('getAuthById');
192 this.logger.debug(_scope, 'called', { id });
193
194 dbCtx = dbCtx || this.db;
195
196 const auth = await dbCtx.oneOrNone(this.statement.authGetById, { id });
197 this.logger.debug(_scope, 'get', { auth });
198 return auth;
199 }
200
201
202 static _epochFix(epoch) {
203 switch (epoch) {
204 case Infinity:
205 return Number.MAX_SAFE_INTEGER;
206
207 case -Infinity:
208 return 0;
209
210 default:
211 return epoch;
212 }
213 }
214
215
216 static _linkToNative(link) {
217 return link && {
218 ...link,
219 created: PostgresDatabase._epochFix(link.created),
220 lastAccess: PostgresDatabase._epochFix(link.lastAccess),
221 expires: PostgresDatabase._epochFix(link.expires),
222 };
223 }
224
225 async insertLink(dbCtx, id, url, authToken) {
226 const _scope = _fileScope('insertLink');
227 this.logger.debug(_scope, 'called', { id, url, authToken });
228
229 dbCtx = dbCtx || this.db;
230
231 const result = await dbCtx.result(this.statement.linkUpsert, { id, url, authToken });
232 this.logger.debug(_scope, 'result', PostgresDatabase._resultLog(result) );
233 return this._postgresInfo(result);
234 }
235
236
237 async getLinkById(dbCtx, id) {
238 const _scope = _fileScope('getLinkById');
239 this.logger.debug(_scope, 'called', { id });
240
241 dbCtx = dbCtx || this.db;
242
243 const link = await dbCtx.oneOrNone(this.statement.linkGetById, { id });
244 this.logger.debug(_scope, 'get', { link });
245 return PostgresDatabase._linkToNative(link);
246 }
247
248
249 async getLinkByUrl(dbCtx, url) {
250 const _scope = _fileScope('getLinkByUrl');
251 this.logger.debug(_scope, 'called', { url });
252
253 dbCtx = dbCtx || this.db;
254
255 const link = await dbCtx.oneOrNone(this.statement.linkGetByUrl, { url });
256 this.logger.debug(_scope, 'get', { link });
257 return PostgresDatabase._linkToNative(link);
258 }
259
260
261 async accessLink(dbCtx, id) {
262 const _scope = _fileScope('accessLink');
263 this.logger.debug(_scope, 'called', { id });
264
265 dbCtx = dbCtx || this.db;
266
267 const link = await dbCtx.oneOrNone(this.statement.linkAccess, { id });
268 this.logger.debug(_scope, 'get', { link });
269 return PostgresDatabase._linkToNative(link);
270 }
271
272
273 async expireLink(dbCtx, id, expires) {
274 const _scope = _fileScope('expireLink');
275 this.logger.debug(_scope, 'called', { id, expires });
276
277 dbCtx = dbCtx || this.db;
278
279 const result = await dbCtx.result(this.statement.linkExpire, { expires, id });
280 this.logger.debug(_scope, 'result', PostgresDatabase._resultLog(result) );
281 return this._postgresInfo(result);
282 }
283
284
285 async updateLink(dbCtx, id, url) {
286 const _scope = _fileScope('updateLink');
287 this.logger.debug(_scope, 'called', { id, url });
288
289 dbCtx = dbCtx || this.db;
290
291 const result = await dbCtx.result(this.statement.linkUpdate, { id, url });
292 this.logger.debug(_scope, 'result', PostgresDatabase._resultLog(result) );
293 return this._postgresInfo(result);
294 }
295
296
297 async getAllLinks(dbCtx) {
298 const _scope = _fileScope('getAllLinks');
299 this.logger.debug(_scope, 'called', { });
300
301 dbCtx = dbCtx || this.db;
302
303 const links = await dbCtx.manyOrNone(this.statement.linkGetAll, { });
304 this.logger.debug(_scope, 'get', { links });
305 return links.map((l) => PostgresDatabase._linkToNative(l));
306 }
307
308
309 }
310
311 module.exports = PostgresDatabase;