add caching for topic content db calls (Postgres only)
[websub-hub] / src / db / postgres / index.js
1 /* eslint-disable security/detect-object-injection */
2 'use strict';
3
4 const pgpInitOptions = {
5 capSQL: true,
6 };
7
8 const path = require('path');
9 const pgp = require('pg-promise')(pgpInitOptions);
10 const svh = require('../schema-version-helper');
11 const Database = require('../base');
12 const DBErrors = require('../errors');
13 const Listener = require('./listener');
14 const common = require('../../common');
15
16 const _fileScope = common.fileScope(__filename);
17
18 const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
19 const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
20 pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
21 const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
22 pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
23
24 const schemaVersionsSupported = {
25 min: {
26 major: 1,
27 minor: 0,
28 patch: 0,
29 },
30 max: {
31 major: 1,
32 minor: 0,
33 patch: 0,
34 },
35 };
36
37 class DatabasePostgres extends Database {
38 constructor(logger, options, _pgp = pgp) {
39 super(logger, options);
40
41 this.db = _pgp(options.db.connectionString);
42 this.schemaVersionsSupported = schemaVersionsSupported;
43
44 // Suppress QF warnings when running tests
45 this.noWarnings = options.db.noWarnings;
46
47 if (options.db.cacheEnabled) {
48 this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
49 channel: 'topic_changed',
50 dataCallback: this._topicChanged.bind(this),
51 connectionEstablishedCallback: this._listenerEstablished.bind(this),
52 connectionLostCallback: this._listenerLost.bind(this),
53 }));
54 }
55
56 // Log queries
57 const queryLogLevel = options.db.queryLogLevel;
58 if (queryLogLevel) {
59 pgpInitOptions.query = (event) => {
60 // Quell outgoing pings
61 if (event && event.query && event.query.startsWith('NOTIFY')) {
62 return;
63 }
64 this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
65 };
66 }
67
68 // Log errors
69 pgpInitOptions.error = (err, event) => {
70 this.logger.error(_fileScope('pgp:error'), '', { err, event });
71 };
72
73 // Deophidiate column names in-place, log results
74 pgpInitOptions.receive = (data, result, event) => {
75 const exemplaryRow = data[0];
76 for (const prop in exemplaryRow) {
77 const camel = Database._camelfy(prop);
78 if (!(camel in exemplaryRow)) {
79 for (const d of data) {
80 d[camel] = d[prop];
81 delete d[prop];
82 }
83 }
84 }
85 if (queryLogLevel) {
86 // Quell outgoing pings
87 if (result && result.command === 'NOTIFY') {
88 return;
89 }
90 // Omitting .rows
91 const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
92 this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
93 }
94 };
95
96 // Expose these for test coverage
97 this.pgpInitOptions = pgpInitOptions;
98 this._pgp = _pgp;
99
100 this._initStatements(_pgp);
101 }
102
103
104 _queryFileHelper(_pgp) {
105 return (file) => {
106 const _scope = _fileScope('_queryFile');
107 /* istanbul ignore next */
108 const qfParams = {
109 minify: true,
110 ...(this.noWarnings && { noWarnings: this.noWarnings }),
111 };
112 const qf = new _pgp.QueryFile(file, qfParams);
113 if (qf.error) {
114 this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
115 throw qf.error;
116 }
117 return qf;
118 };
119 }
120
121
122 async initialize(applyMigrations = true) {
123 const _scope = _fileScope('initialize');
124 this.logger.debug(_scope, 'called', { applyMigrations });
125 if (applyMigrations) {
126 await this._initTables();
127 }
128 await super.initialize();
129 if (this.listener) {
130 await this.listener.start();
131 }
132 }
133
134
135 async _initTables(_pgp) {
136 const _scope = _fileScope('_initTables');
137 this.logger.debug(_scope, 'called', {});
138
139 const _queryFile = this._queryFileHelper(_pgp || this._pgp);
140
141 // Migrations rely upon this table, ensure it exists.
142 const metaVersionTable = '_meta_schema_version';
143
144 const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
145 let metaExists = await tableExists(metaVersionTable);
146 if (!metaExists) {
147 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
148 const initSql = _queryFile(fPath);
149 const results = await this.db.multiResult(initSql);
150 this.logger.debug(_scope, 'executed init sql', { results });
151 metaExists = await tableExists(metaVersionTable);
152 /* istanbul ignore if */
153 if (!metaExists) {
154 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
155 }
156 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
157 }
158
159 // Apply migrations
160 const currentSchema = await this._currentSchema();
161 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
162 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
163 for (const v of migrationsWanted) {
164 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
165 const migrationSql = _queryFile(fPath);
166 const results = await this.db.multiResult(migrationSql);
167 this.logger.debug(_scope, 'executed migration sql', { version: v, results });
168 this.logger.info(_scope, 'applied migration', { version: v });
169 }
170 }
171
172
173 _initStatements(_pgp) {
174 const _scope = _fileScope('_initStatements');
175 const _queryFile = this._queryFileHelper(_pgp);
176 this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
177 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
178 }
179
180
181 async healthCheck() {
182 const _scope = _fileScope('healthCheck');
183 this.logger.debug(_scope, 'called', {});
184 const c = await this.db.connect();
185 c.done();
186 return { serverVersion: c.client.serverVersion };
187 }
188
189
190 async _currentSchema() {
191 return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
192 }
193
194
195 async _closeConnection() {
196 const _scope = _fileScope('_closeConnection');
197 try {
198 if (this.listener) {
199 await this.listener.stop();
200 }
201 await this._pgp.end();
202 } catch (e) {
203 this.logger.error(_scope, 'failed', { error: e });
204 throw e;
205 }
206 }
207
208
209 /* istanbul ignore next */
210 async _purgeTables(really = false) {
211 const _scope = _fileScope('_purgeTables');
212 try {
213 if (really) {
214 await this.db.tx(async (t) => {
215 await t.batch([
216 'topic',
217 // 'topic_fetch_in_progress',
218 // 'verification',
219 // 'verification_in_progress',
220 // 'subscription',
221 // 'subscription_delivery_in_progress',
222 ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
223 });
224 }
225 } catch (e) {
226 this.logger.error(_scope, 'failed', { error: e });
227 throw e;
228 }
229 }
230
231
232 // eslint-disable-next-line class-methods-use-this
233 _engineInfo(result) {
234 return {
235 changes: result.rowCount,
236 lastInsertRowid: result.rows.length ? result.rows[0].id : undefined,
237 duration: result.duration,
238 };
239 }
240
241
242 // eslint-disable-next-line class-methods-use-this
243 _resultLog(result) {
244 return common.pick(result, ['command', 'rowCount', 'duration']);
245 }
246
247
248 /**
249 * Receive notices when topic entry is updated.
250 * Clear relevant cache entry.
251 * @param {String} payload
252 */
253 _topicChanged(payload) {
254 const _scope = _fileScope('_topicChanged');
255 if (payload !== 'ping') {
256 this.logger.debug(_scope, 'called', { payload });
257 this.cache.delete(payload);
258 }
259 }
260
261
262 /**
263 * Called when a listener connection is opened.
264 * Enable cache.
265 */
266 _listenerEstablished() {
267 const _scope = _fileScope('_listenerEstablished');
268 this.logger.debug(_scope, 'called', {});
269 this.cache = new Map();
270 }
271
272
273 /**
274 * Called when a listener connection is closed.
275 * Disable cache.
276 */
277 _listenerLost() {
278 const _scope = _fileScope('_listenerLost');
279 this.logger.debug(_scope, 'called', {});
280 delete this.cache;
281 }
282
283
284 /**
285 * Return a cached entry, if available.
286 * @param {*} key
287 */
288 _cacheGet(key) {
289 const _scope = _fileScope('_cacheGet');
290 if (this.cache && this.cache.has(key)) {
291 const cacheEntry = this.cache.get(key);
292 this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
293 cacheEntry.hits += 1;
294 cacheEntry.lastHit = new Date();
295 return cacheEntry.data;
296 }
297 }
298
299
300 /**
301 * Store an entry in cache, if available.
302 * @param {*} key
303 * @param {*} data
304 */
305 _cacheSet(key, data) {
306 const _scope = _fileScope('_cacheSet');
307 if (this.cache) {
308 this.cache.set(key, {
309 added: new Date(),
310 hits: 0,
311 lastHit: undefined,
312 data,
313 });
314 this.logger.debug(_scope, 'added cache entry', { key });
315 }
316 }
317
318
319 async context(fn) {
320 return this.db.task(async (t) => fn(t));
321 }
322
323
324 // eslint-disable-next-line class-methods-use-this
325 async transaction(dbCtx, fn) {
326 return dbCtx.txIf(async (t) => fn(t));
327 }
328
329
330 async authenticationSuccess(dbCtx, identifier) {
331 const _scope = _fileScope('authenticationSuccess');
332 this.logger.debug(_scope, 'called', { identifier });
333
334 let result;
335 try {
336 result = await dbCtx.result(this.statement.authenticationSuccess, { identifier });
337 if (result.rowCount != 1) {
338 throw new DBErrors.UnexpectedResult('did not update authentication success event');
339 }
340 } catch (e) {
341 this.logger.error(_scope, 'failed', { error: e, identifier });
342 throw e;
343 }
344 }
345
346
347 async authenticationGet(dbCtx, identifier) {
348 const _scope = _fileScope('authenticationGet');
349 this.logger.debug(_scope, 'called', { identifier });
350
351 let auth;
352 try {
353 auth = await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier });
354 return auth;
355 } catch (e) {
356 this.logger.error(_scope, 'failed', { error: e, identifier });
357 throw e;
358 }
359 }
360
361
362 async authenticationUpsert(dbCtx, identifier, credential) {
363 const _scope = _fileScope('authenticationUpsert');
364 const scrubbedCredential = '*'.repeat((credential || '').length);
365 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
366
367 let result;
368 try {
369 result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
370 if (result.rowCount != 1) {
371 throw new DBErrors.UnexpectedResult('did not upsert authentication');
372 }
373 } catch (e) {
374 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
375 throw e;
376 }
377 }
378
379
380 async subscriptionsByTopicId(dbCtx, topicId) {
381 const _scope = _fileScope('subscriptionsByTopicId');
382 this.logger.debug(_scope, 'called', { topicId });
383
384 let count;
385 try {
386 count = await dbCtx.manyOrNone(this.statement.subscriptionsByTopicId, { topicId });
387 return count;
388 } catch (e) {
389 this.logger.error(_scope, 'failed', { error: e, topicId });
390 throw e;
391 }
392 }
393
394
395 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
396 const _scope = _fileScope('subscriptionCountByTopicUrl');
397 this.logger.debug(_scope, 'called', { topicUrl });
398
399 let count;
400 try {
401 count = await dbCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl });
402 return count;
403 } catch (e) {
404 this.logger.error(_scope, 'failed', { error: e, topicUrl });
405 throw e;
406 }
407 }
408
409
410 async subscriptionDelete(dbCtx, callback, topicId) {
411 const _scope = _fileScope('subscriptionDelete');
412 this.logger.debug(_scope, 'called', { callback, topicId });
413
414 try {
415 const result = await dbCtx.result(this.statement.subscriptionDelete, { callback, topicId });
416 return this._engineInfo(result);
417 } catch (e) {
418 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
419 throw e;
420 }
421 }
422
423
424 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
425 const _scope = _fileScope('subscriptionDeliveryClaim');
426 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
427
428 try {
429 const claims = await dbCtx.txIf(async (txCtx) => {
430 return txCtx.manyOrNone(this.statement.subscriptionDeliveryClaim, { claimant, wanted, claimTimeoutSeconds });
431 });
432 return claims.map((r) => r.id);
433 } catch (e) {
434 this.logger.error(_scope, 'failed', { error: e, claimant, wanted, claimTimeoutSeconds });
435 throw e;
436 }
437 }
438
439
440 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
441 const _scope = _fileScope('subscriptionDeliveryClaimById');
442 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
443
444 let result;
445 try {
446 result = await dbCtx.txIf(async (txCtx) => {
447 result = await txCtx.result(this.statement.subscriptionDeliveryClaimById, { claimant, subscriptionId, claimTimeoutSeconds });
448 if (result.rowCount != 1) {
449 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
450 }
451 return result;
452 });
453 return this._engineInfo(result);
454 } catch (e) {
455 this.logger.error(_scope, 'failed', { error: e, claimant, subscriptionId, claimTimeoutSeconds });
456 throw e;
457 }
458 }
459
460
461 async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
462 const _scope = _fileScope('subscriptionDeliveryComplete');
463 this.logger.debug(_scope, 'called', { callback, topicId });
464
465 let result;
466 try {
467 await dbCtx.txIf(async (txCtx) => {
468 result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
469 if (result.rowCount != 1) {
470 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
471 }
472 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
473 if (result.rowCount != 1) {
474 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
475 }
476 });
477 } catch (e) {
478 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
479 throw e;
480 }
481 }
482
483
484 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
485 const _scope = _fileScope('subscriptionDeliveryGone');
486 this.logger.debug(_scope, 'called', { callback, topicId });
487
488 let result;
489 try {
490 await dbCtx.txIf(async (txCtx) => {
491 result = await txCtx.result(this.statement.subscriptionDelete, { callback, topicId });
492 if (result.rowCount != 1) {
493 throw new DBErrors.UnexpectedResult('did not delete subscription');
494 }
495 // Delete cascades to delivery
496 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
497 // if (result.rowCount != 1) {
498 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
499 // }
500 });
501 } catch (e) {
502 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
503 throw e;
504 }
505 }
506
507
508 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
509 const _scope = _fileScope('subscriptionDeliveryIncomplete');
510 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
511
512 let result;
513 try {
514 await dbCtx.txIf(async (txCtx) => {
515 const { currentAttempt } = await txCtx.one(this.statement.subscriptionDeliveryAttempts, { callback, topicId });
516 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
517 result = await txCtx.result(this.statement.subscriptionDeliveryFailure, { nextAttemptDelaySeconds, callback, topicId });
518 if (result.rowCount != 1) {
519 throw new DBErrors.UnexpectedResult('did not set subscription delivery failure');
520 }
521 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
522 if (result.rowCount != 1) {
523 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
524 }
525 });
526 } catch (e) {
527 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
528 throw e;
529 }
530 }
531
532
533 async subscriptionGet(dbCtx, callback, topicId) {
534 const _scope = _fileScope('subscriptionGet');
535 this.logger.debug(_scope, 'called', { callback, topicId });
536
537 let subscription;
538 try {
539 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGet, { callback, topicId });
540 return subscription;
541 } catch (e) {
542 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
543 throw e;
544 }
545 }
546
547
548 async subscriptionGetById(dbCtx, subscriptionId) {
549 const _scope = _fileScope('subscriptionGetById');
550 this.logger.debug(_scope, 'called', { subscriptionId });
551
552 let subscription;
553 try {
554 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGetById, { subscriptionId });
555 return subscription;
556 } catch (e) {
557 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
558 throw e;
559 }
560 }
561
562
563 async subscriptionUpdate(dbCtx, data) {
564 const _scope = _fileScope('subscriptionUpdate');
565 this.logger.debug(_scope, 'called', { data });
566
567 const subscriptionData = {
568 ...data,
569 };
570
571 this._subscriptionUpdateDataValidate(subscriptionData);
572
573 let result;
574 try {
575 result = await dbCtx.result(this.statement.subscriptionUpdate, subscriptionData);
576 if (result.rowCount != 1) {
577 throw new DBErrors.UnexpectedResult('did not update subscription');
578 }
579 } catch (e) {
580 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
581 throw e;
582 }
583 }
584
585
586 async subscriptionUpsert(dbCtx, data) {
587 const _scope = _fileScope('subscriptionUpsert');
588 this.logger.debug(_scope, 'called', { ...data });
589
590 const subscriptionData = {
591 secret: null,
592 httpRemoteAddr: null,
593 httpFrom: null,
594 ...data,
595 };
596 this._subscriptionUpsertDataValidate(subscriptionData);
597
598 let result;
599 try {
600 result = await dbCtx.result(this.statement.subscriptionUpsert, subscriptionData);
601 if (result.rowCount != 1) {
602 throw new DBErrors.UnexpectedResult('did not upsert subscription');
603 }
604 return this._engineInfo(result);
605 } catch (e) {
606 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
607 throw e;
608 }
609 }
610
611
612 async topicDeleted(dbCtx, topicId) {
613 const _scope = _fileScope('topicDeleted');
614 this.logger.debug(_scope, 'called', { topicId });
615
616 let result;
617 try {
618 result = await dbCtx.result(this.statement.topicDeleted, { topicId });
619 if (result.rowCount != 1) {
620 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
621 }
622 } catch (e) {
623 this.logger.error(_scope, 'failed to update topic as deleted', { error: e, topicId });
624 throw e;
625 }
626 }
627
628
629 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
630 const _scope = _fileScope('topicFetchClaim');
631 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
632
633 let claims;
634 try {
635 await dbCtx.txIf(async (txCtx) => {
636 claims = await txCtx.manyOrNone(this.statement.topicContentFetchClaim, { claimant, wanted, claimTimeoutSeconds });
637 });
638 return claims.map((r) => r.id);
639 } catch (e) {
640 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e });
641 throw e;
642 }
643 }
644
645
646 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
647 const _scope = _fileScope('topicFetchClaimById');
648 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
649
650 let result;
651 try {
652 await dbCtx.txIf(async (txCtx) => {
653 result = await txCtx.result(this.statement.topicContentFetchClaimById, { topicId, claimant, claimTimeoutSeconds });
654 });
655 return this._engineInfo(result);
656 } catch (e) {
657 this.logger.error(_scope, 'failed', { error: e, topicId });
658 throw e;
659 }
660 }
661
662
663 async topicFetchComplete(dbCtx, topicId) {
664 const _scope = _fileScope('topicFetchComplete');
665 this.logger.debug(_scope, 'called', { topicId });
666
667 let result;
668 try {
669 await dbCtx.txIf(async (txCtx) => {
670 result = await txCtx.result(this.statement.topicAttemptsReset, { topicId });
671 if (result.rowCount != 1) {
672 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
673 }
674 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
675 if (result.rowCount != 1) {
676 throw new DBErrors.UnexpectedResult('did not release topic fetch');
677 }
678 });
679 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
680 return this._engineInfo(result);
681 } catch (e) {
682 this.logger.error(_scope, 'failed', { error: e, result, topicId });
683 throw e;
684 }
685 }
686
687
688 async topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
689 const _scope = _fileScope('topicFetchIncomplete');
690 this.logger.debug(_scope, 'called', { topicId });
691
692 let result;
693 try {
694 result = await dbCtx.txIf(async (txCtx) => {
695 const { contentFetchAttemptsSinceSuccess: currentAttempt } = await txCtx.one(this.statement.topicAttempts, { topicId });
696 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
697 result = await txCtx.result(this.statement.topicAttemptsIncrement, { topicId, nextAttemptDelaySeconds });
698 if (result.rowCount != 1) {
699 throw new DBErrors.UnexpectedResult('did not set topic attempts');
700 }
701 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
702 if (result.rowCount != 1) {
703 throw new DBErrors.UnexpectedResult('did not release topic fetch');
704 }
705 return result;
706 });
707 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
708 return this._engineInfo(result);
709 } catch (e) {
710 this.logger.error(_scope, 'failed', { error: e, result, topicId });
711 throw e;
712 }
713 }
714
715
716 async topicFetchRequested(dbCtx, topicId) {
717 const _scope = _fileScope('topicFetchRequested');
718 this.logger.debug(_scope, 'called', { topicId });
719
720 let result;
721 try {
722 result = await dbCtx.result(this.statement.topicContentFetchRequested, { topicId });
723 if (result.rowCount != 1) {
724 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
725 }
726 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
727 return this._engineInfo(result);
728 } catch (e) {
729 this.logger.error(_scope, 'failed', { error: e, topicId });
730 throw e;
731 }
732 }
733
734
735 async topicGetAll(dbCtx) {
736 const _scope = _fileScope('topicGetAll');
737 this.logger.debug(_scope, 'called');
738
739 let topics;
740 try {
741 topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
742 } catch (e) {
743 this.logger.error(_scope, 'failed', { error: e, topics });
744 throw e;
745 }
746 if (topics) {
747 topics = topics.map(this._topicDefaults.bind(this));
748 }
749 return topics;
750 }
751
752
753 async topicGetById(dbCtx, topicId, applyDefaults = true) {
754 const _scope = _fileScope('topicGetById');
755 this.logger.debug(_scope, 'called', { topicId });
756
757 let topic;
758 try {
759 topic = await dbCtx.oneOrNone(this.statement.topicGetById, { topicId });
760 if (applyDefaults) {
761 topic = this._topicDefaults(topic);
762 }
763 return topic;
764 } catch (e) {
765 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
766 throw e;
767 }
768 }
769
770
771 async topicGetByUrl(dbCtx, topicUrl) {
772 const _scope = _fileScope('topicGetByUrl');
773 this.logger.debug(_scope, 'called', { topicUrl });
774
775 let topic;
776 try {
777 topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
778 return this._topicDefaults(topic);
779 } catch (e) {
780 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
781 throw e;
782 }
783 }
784
785
786 async topicGetContentById(dbCtx, topicId) {
787 const _scope = _fileScope('topicGetContentById');
788 this.logger.debug(_scope, 'called', { topicId });
789
790 let topic;
791 try {
792 topic = this._cacheGet(topicId);
793 if (topic) {
794 return topic;
795 }
796 topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
797 const topicWithDefaults = this._topicDefaults(topic);
798 this._cacheSet(topicId, topicWithDefaults);
799 return topicWithDefaults;
800 } catch (e) {
801 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
802 throw e;
803 }
804 }
805
806
807 async topicSet(dbCtx, data) {
808 const _scope = _fileScope('topicSet');
809 this.logger.debug(_scope, 'called', data);
810
811 const topicSetData = {
812 publisherValidationUrl: null,
813 leaseSecondsPreferred: null,
814 leaseSecondsMin: null,
815 leaseSecondsMax: null,
816 ...data,
817 };
818
819 let result;
820 try {
821 this._topicSetDataValidate(topicSetData);
822 result = await dbCtx.result(this.statement.topicUpsert, topicSetData);
823 if (result.rowCount != 1) {
824 throw new DBErrors.UnexpectedResult('did not set topic data');
825 }
826 this.logger.debug(_scope, 'success', { topicSetData, ...this._resultLog(result) });
827 return this._engineInfo(result);
828 } catch (e) {
829 this.logger.error(_scope, 'failed', { error: e, result });
830 throw e;
831 }
832 }
833
834
835 async topicSetContent(dbCtx, data) {
836 const _scope = _fileScope('topicSetContent');
837 const topicSetContentData = {
838 contentType: null,
839 ...data,
840 };
841 const logData = {
842 ...topicSetContentData,
843 content: common.logTruncate(topicSetContentData.content, 100),
844 };
845 this.logger.debug(_scope, 'called', data);
846
847 let result;
848 try {
849 this._topicSetContentDataValidate(topicSetContentData);
850 result = await dbCtx.result(this.statement.topicSetContent, topicSetContentData);
851 logData.result = this._resultLog(result);
852 if (result.rowCount != 1) {
853 throw new DBErrors.UnexpectedResult('did not set topic content');
854 }
855 this.logger.debug(_scope, 'success', { ...logData });
856 return this._engineInfo(result);
857 } catch (e) {
858 this.logger.error(_scope, 'failed', { error: e, ...logData });
859 throw e;
860 }
861 }
862
863
864 async topicUpdate(dbCtx, data) {
865 const _scope = _fileScope('topicUpdate');
866 this.logger.debug(_scope, 'called', { data });
867
868 const topicData = {
869 leaseSecondsPreferred: null,
870 leaseSecondsMin: null,
871 leaseSecondsMax: null,
872 publisherValidationUrl: null,
873 ...data,
874 };
875
876 this._topicUpdateDataValidate(topicData);
877
878 let result;
879 try {
880 result = await dbCtx.result(this.statement.topicUpdate, topicData);
881 if (result.rowCount != 1) {
882 throw new DBErrors.UnexpectedResult('did not update topic');
883 }
884 } catch (e) {
885 this.logger.error(_scope, 'failed', { error: e, topicData });
886 throw e;
887 }
888 }
889
890
891 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
892 const _scope = _fileScope('verificationClaim');
893 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
894
895 let result;
896 try {
897 await dbCtx.txIf(async (txCtx) => {
898 result = await txCtx.manyOrNone(this.statement.verificationClaim, { claimant, wanted, claimTimeoutSeconds });
899 });
900 return result.map((r) => r.id);
901 } catch (e) {
902 this.logger.error(_scope, 'failed', { wanted, claimTimeoutSeconds });
903 throw e;
904 }
905 }
906
907
908
909 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
910 const _scope = _fileScope('verificationClaimById');
911 this.logger.debug(_scope, 'called', { verificationId, claimant, claimTimeoutSeconds });
912
913 let result;
914 try {
915 await dbCtx.txIf(async (txCtx) => {
916 result = await txCtx.result(this.statement.verificationClaimById, { verificationId, claimant, claimTimeoutSeconds });
917 });
918 return this._engineInfo(result);
919 } catch (e) {
920 this.logger.error(_scope, 'failed', { verificationId, claimant, claimTimeoutSeconds });
921 throw e;
922 }
923 }
924
925
926 async verificationComplete(dbCtx, verificationId, callback, topicId) {
927 const _scope = _fileScope('verificationComplete');
928 this.logger.debug(_scope, 'called', { verificationId });
929
930 let result;
931 try {
932 await dbCtx.txIf(async (txCtx) => {
933 result = await txCtx.result(this.statement.verificationScrub, { verificationId, callback, topicId });
934 if (result.rowCount < 1) {
935 throw new DBErrors.UnexpectedResult('did not remove verifications');
936 }
937 });
938 } catch (e) {
939 this.logger.error(_scope, 'failed', { verificationId });
940 throw e;
941 }
942 return this._engineInfo(result);
943 }
944
945
946 async verificationGetById(dbCtx, verificationId) {
947 const _scope = _fileScope('verificationGetById');
948 this.logger.debug(_scope, 'called', { verificationId });
949
950 let verification;
951 try {
952 verification = await dbCtx.oneOrNone(this.statement.verificationGetById, { verificationId });
953 return verification;
954 } catch (e) {
955 this.logger.error(_scope, 'failed', { error: e, verificationId });
956 throw e;
957 }
958 }
959
960
961 async verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
962 const _scope = _fileScope('verificationIncomplete');
963 this.logger.debug(_scope, 'called', { verificationId });
964
965 let result;
966 try {
967 await dbCtx.txIf(async (txCtx) => {
968 const { attempts } = await txCtx.one(this.statement.verificationAttempts, { verificationId });
969 const nextAttemptDelaySeconds = common.attemptRetrySeconds(attempts, retryDelays);
970 result = await txCtx.result(this.statement.verificationAttemptIncrement, { verificationId, nextAttemptDelaySeconds });
971 if (result.rowCount != 1) {
972 throw new DBErrors.UnexpectedResult('did not update verification attempts');
973 }
974 result = await txCtx.result(this.statement.verificationDone, { verificationId });
975 if (result.rowCount != 1) {
976 throw new DBErrors.UnexpectedResult('did not release verification');
977 }
978 });
979 } catch (e) {
980 this.logger.error(_scope, 'failed', { error: e, verificationId });
981 throw e;
982 }
983 }
984
985
986 async verificationInsert(dbCtx, verification) {
987 const _scope = _fileScope('verificationInsert');
988 this.logger.debug(_scope, 'called', { verification });
989
990 const verificationData = {
991 secret: null,
992 httpRemoteAddr: null,
993 httpFrom: null,
994 requestId: null,
995 ...verification,
996 };
997
998 let result, verificationId;
999 try {
1000 this._verificationDataValidate(verificationData);
1001 result = await dbCtx.result(this.statement.verificationInsert, verificationData);
1002 if (result.rowCount != 1) {
1003 throw new DBErrors.UnexpectedResult('did not insert verification');
1004 }
1005 verificationId = result.rows[0].id;
1006 this.logger.debug(_scope, 'inserted verification', { verificationId });
1007
1008 return verificationId;
1009 } catch (e) {
1010 this.logger.error(_scope, 'failed', { error: e, verificationData });
1011 throw e;
1012 }
1013 }
1014
1015
1016 async verificationRelease(dbCtx, verificationId) {
1017 const _scope = _fileScope('verificationRelease');
1018 this.logger.debug(_scope, 'called', { verificationId });
1019
1020 let result;
1021 try {
1022 result = await dbCtx.result(this.statement.verificationDone, { verificationId });
1023 if (result.rowCount != 1) {
1024 throw new DBErrors.UnexpectedResult('did not release verification');
1025 }
1026 return this._engineInfo(result);
1027 } catch (e) {
1028 this.logger.error(_scope, 'failed', { error: e, verificationId });
1029 throw e;
1030 }
1031 }
1032
1033
1034 async verificationUpdate(dbCtx, verificationId, data) {
1035 const _scope = _fileScope('verificationUpdate');
1036 this.logger.debug(_scope, 'called', { verificationId, data });
1037
1038 const verificationData = {
1039 reason: null,
1040 verificationId,
1041 ...data,
1042 };
1043
1044 let result;
1045 try {
1046 this._verificationUpdateDataValidate(verificationData);
1047 result = await dbCtx.result(this.statement.verificationUpdate, verificationData);
1048 if (result.rowCount != 1) {
1049 throw new DBErrors.UnexpectedResult('did not update verification');
1050 }
1051 } catch (e) {
1052 this.logger.error(_scope, 'failed', { error: e, verificationData });
1053 throw e;
1054 }
1055 }
1056
1057
1058 async verificationValidated(dbCtx, verificationId) {
1059 const _scope = _fileScope('verificationValidated');
1060 this.logger.debug(_scope, 'called', { verificationId });
1061
1062 let result;
1063 try {
1064 result = await dbCtx.result(this.statement.verificationValidate, { verificationId });
1065 if (result.rowCount != 1) {
1066 throw new DBErrors.UnexpectedResult('did not set verification validation');
1067 }
1068 } catch (e) {
1069 this.logger.error(_scope, 'failed', { error: e, verificationId });
1070 throw e;
1071 }
1072 }
1073
1074 }
1075
1076 module.exports = DatabasePostgres;