213fa1031c3c44fd29c48a46a8af86ff8bed6217
[websub-hub] / src / db / postgres / index.js
1 /* eslint-disable security/detect-object-injection */
2 'use strict';
3
4 const pgpInitOptions = {
5 capSQL: true,
6 };
7
8 const path = require('path');
9 const pgp = require('pg-promise')(pgpInitOptions);
10 const svh = require('../schema-version-helper');
11 const Database = require('../base');
12 const DBErrors = require('../errors');
13 const Listener = require('./listener');
14 const common = require('../../common');
15
16 const _fileScope = common.fileScope(__filename);
17
18 const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
19 const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
20 pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
21 const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
22 pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
23
24 const schemaVersionsSupported = {
25 min: {
26 major: 1,
27 minor: 0,
28 patch: 0,
29 },
30 max: {
31 major: 1,
32 minor: 0,
33 patch: 1,
34 },
35 };
36
37 class DatabasePostgres extends Database {
38 constructor(logger, options, _pgp = pgp) {
39 super(logger, options);
40
41 this.db = _pgp(options.db.connectionString);
42 this.schemaVersionsSupported = schemaVersionsSupported;
43
44 // Suppress QF warnings when running tests
45 this.noWarnings = options.db.noWarnings;
46
47 if (options.db.cacheEnabled) {
48 this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
49 channel: 'topic_changed',
50 dataCallback: this._topicChanged.bind(this),
51 connectionEstablishedCallback: this._listenerEstablished.bind(this),
52 connectionLostCallback: this._listenerLost.bind(this),
53 }));
54 }
55
56 // Log queries
57 const queryLogLevel = options.db.queryLogLevel;
58 if (queryLogLevel) {
59 pgpInitOptions.query = (event) => {
60 // Quell outgoing pings
61 if (event && event.query && event.query.startsWith('NOTIFY')) {
62 return;
63 }
64 this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
65 };
66 }
67
68 // Log errors
69 pgpInitOptions.error = (err, event) => {
70 this.logger.error(_fileScope('pgp:error'), '', { err, event });
71 };
72
73 // Deophidiate column names in-place, log results
74 pgpInitOptions.receive = (data, result, event) => {
75 const exemplaryRow = data[0];
76 for (const prop in exemplaryRow) {
77 const camel = Database._camelfy(prop);
78 if (!(camel in exemplaryRow)) {
79 for (const d of data) {
80 d[camel] = d[prop];
81 delete d[prop];
82 }
83 }
84 }
85 if (queryLogLevel) {
86 // Quell outgoing pings
87 if (result && result.command === 'NOTIFY') {
88 return;
89 }
90 // Omitting .rows
91 const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
92 this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
93 }
94 };
95
96 // Expose these for test coverage
97 this.pgpInitOptions = pgpInitOptions;
98 this._pgp = _pgp;
99
100 this._initStatements(_pgp);
101 }
102
103
104 _queryFileHelper(_pgp) {
105 return (file) => {
106 const _scope = _fileScope('_queryFile');
107 /* istanbul ignore next */
108 const qfParams = {
109 minify: true,
110 ...(this.noWarnings && { noWarnings: this.noWarnings }),
111 };
112 const qf = new _pgp.QueryFile(file, qfParams);
113 if (qf.error) {
114 this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
115 throw qf.error;
116 }
117 return qf;
118 };
119 }
120
121
122 async initialize(applyMigrations = true) {
123 const _scope = _fileScope('initialize');
124 this.logger.debug(_scope, 'called', { applyMigrations });
125 if (applyMigrations) {
126 await this._initTables();
127 }
128 await super.initialize();
129 if (this.listener) {
130 await this.listener.start();
131 }
132 }
133
134
135 async _initTables(_pgp) {
136 const _scope = _fileScope('_initTables');
137 this.logger.debug(_scope, 'called', {});
138
139 const _queryFile = this._queryFileHelper(_pgp || this._pgp);
140
141 // Migrations rely upon this table, ensure it exists.
142 const metaVersionTable = '_meta_schema_version';
143
144 const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
145 let metaExists = await tableExists(metaVersionTable);
146 if (!metaExists) {
147 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
148 const initSql = _queryFile(fPath);
149 const results = await this.db.multiResult(initSql);
150 this.logger.debug(_scope, 'executed init sql', { results });
151 metaExists = await tableExists(metaVersionTable);
152 /* istanbul ignore if */
153 if (!metaExists) {
154 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
155 }
156 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
157 }
158
159 // Apply migrations
160 const currentSchema = await this._currentSchema();
161 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
162 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
163 for (const v of migrationsWanted) {
164 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
165 const migrationSql = _queryFile(fPath);
166 const results = await this.db.multiResult(migrationSql);
167 this.logger.debug(_scope, 'executed migration sql', { version: v, results });
168 this.logger.info(_scope, 'applied migration', { version: v });
169 }
170 }
171
172
173 _initStatements(_pgp) {
174 const _scope = _fileScope('_initStatements');
175 const _queryFile = this._queryFileHelper(_pgp);
176 this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
177 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
178 }
179
180
181 async healthCheck() {
182 const _scope = _fileScope('healthCheck');
183 this.logger.debug(_scope, 'called', {});
184 const c = await this.db.connect();
185 c.done();
186 return { serverVersion: c.client.serverVersion };
187 }
188
189
190 async _currentSchema() {
191 return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
192 }
193
194
195 async _closeConnection() {
196 const _scope = _fileScope('_closeConnection');
197 try {
198 if (this.listener) {
199 await this.listener.stop();
200 }
201 await this._pgp.end();
202 } catch (e) {
203 this.logger.error(_scope, 'failed', { error: e });
204 throw e;
205 }
206 }
207
208
209 /* istanbul ignore next */
210 async _purgeTables(really = false) {
211 const _scope = _fileScope('_purgeTables');
212 try {
213 if (really) {
214 await this.db.tx(async (t) => {
215 await t.batch([
216 'topic',
217 // 'topic_fetch_in_progress',
218 // 'verification',
219 // 'verification_in_progress',
220 // 'subscription',
221 // 'subscription_delivery_in_progress',
222 ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
223 });
224 }
225 } catch (e) {
226 this.logger.error(_scope, 'failed', { error: e });
227 throw e;
228 }
229 }
230
231
232 // eslint-disable-next-line class-methods-use-this
233 _engineInfo(result) {
234 return {
235 changes: result.rowCount,
236 lastInsertRowid: result.rows.length ? result.rows[0].id : undefined,
237 duration: result.duration,
238 };
239 }
240
241
242 // eslint-disable-next-line class-methods-use-this
243 _resultLog(result) {
244 return common.pick(result, ['command', 'rowCount', 'duration']);
245 }
246
247
248 /**
249 * Receive notices when topic entry is updated.
250 * Clear relevant cache entry.
251 * @param {String} payload
252 */
253 _topicChanged(payload) {
254 const _scope = _fileScope('_topicChanged');
255 if (payload !== 'ping') {
256 this.logger.debug(_scope, 'called', { payload });
257 this.cache.delete(payload);
258 }
259 }
260
261
262 /**
263 * Called when a listener connection is opened.
264 * Enable cache.
265 */
266 _listenerEstablished() {
267 const _scope = _fileScope('_listenerEstablished');
268 this.logger.debug(_scope, 'called', {});
269 this.cache = new Map();
270 }
271
272
273 /**
274 * Called when a listener connection is closed.
275 * Disable cache.
276 */
277 _listenerLost() {
278 const _scope = _fileScope('_listenerLost');
279 this.logger.debug(_scope, 'called', {});
280 delete this.cache;
281 }
282
283
284 /**
285 * Return a cached entry, if available.
286 * @param {*} key
287 */
288 _cacheGet(key) {
289 const _scope = _fileScope('_cacheGet');
290 if (this.cache && this.cache.has(key)) {
291 const cacheEntry = this.cache.get(key);
292 this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
293 cacheEntry.hits += 1;
294 cacheEntry.lastHit = new Date();
295 return cacheEntry.data;
296 }
297 }
298
299
300 /**
301 * Store an entry in cache, if available.
302 * @param {*} key
303 * @param {*} data
304 */
305 _cacheSet(key, data) {
306 const _scope = _fileScope('_cacheSet');
307 if (this.cache) {
308 this.cache.set(key, {
309 added: new Date(),
310 hits: 0,
311 lastHit: undefined,
312 data,
313 });
314 this.logger.debug(_scope, 'added cache entry', { key });
315 }
316 }
317
318
319 async context(fn) {
320 return this.db.task(async (t) => fn(t));
321 }
322
323
324 // eslint-disable-next-line class-methods-use-this
325 async transaction(dbCtx, fn) {
326 return dbCtx.txIf(async (t) => fn(t));
327 }
328
329
330 async authenticationSuccess(dbCtx, identifier) {
331 const _scope = _fileScope('authenticationSuccess');
332 this.logger.debug(_scope, 'called', { identifier });
333
334 let result;
335 try {
336 result = await dbCtx.result(this.statement.authenticationSuccess, { identifier });
337 if (result.rowCount != 1) {
338 throw new DBErrors.UnexpectedResult('did not update authentication success event');
339 }
340 } catch (e) {
341 this.logger.error(_scope, 'failed', { error: e, identifier });
342 throw e;
343 }
344 }
345
346
347 async authenticationGet(dbCtx, identifier) {
348 const _scope = _fileScope('authenticationGet');
349 this.logger.debug(_scope, 'called', { identifier });
350
351 let auth;
352 try {
353 auth = await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier });
354 return auth;
355 } catch (e) {
356 this.logger.error(_scope, 'failed', { error: e, identifier });
357 throw e;
358 }
359 }
360
361
362 async authenticationUpsert(dbCtx, identifier, credential) {
363 const _scope = _fileScope('authenticationUpsert');
364 const scrubbedCredential = '*'.repeat((credential || '').length);
365 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
366
367 let result;
368 try {
369 result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
370 if (result.rowCount != 1) {
371 throw new DBErrors.UnexpectedResult('did not upsert authentication');
372 }
373 } catch (e) {
374 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
375 throw e;
376 }
377 }
378
379
380 async subscriptionsByTopicId(dbCtx, topicId) {
381 const _scope = _fileScope('subscriptionsByTopicId');
382 this.logger.debug(_scope, 'called', { topicId });
383
384 let count;
385 try {
386 count = await dbCtx.manyOrNone(this.statement.subscriptionsByTopicId, { topicId });
387 return count;
388 } catch (e) {
389 this.logger.error(_scope, 'failed', { error: e, topicId });
390 throw e;
391 }
392 }
393
394
395 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
396 const _scope = _fileScope('subscriptionCountByTopicUrl');
397 this.logger.debug(_scope, 'called', { topicUrl });
398
399 let count;
400 try {
401 count = await dbCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl });
402 return count;
403 } catch (e) {
404 this.logger.error(_scope, 'failed', { error: e, topicUrl });
405 throw e;
406 }
407 }
408
409
410 async subscriptionDelete(dbCtx, callback, topicId) {
411 const _scope = _fileScope('subscriptionDelete');
412 this.logger.debug(_scope, 'called', { callback, topicId });
413
414 try {
415 const result = await dbCtx.result(this.statement.subscriptionDelete, { callback, topicId });
416 return this._engineInfo(result);
417 } catch (e) {
418 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
419 throw e;
420 }
421 }
422
423
424 async subscriptionDeleteExpired(dbCtx, topicId) {
425 const _scope = _fileScope('subscriptionDeleteExpired');
426 this.logger.debug(_scope, 'called', { topicId });
427
428 try {
429 const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
430 this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
431 return this._engineInfo(result);
432 } catch (e) {
433 this.logger.error(_scope, 'failed', { error: e, topicId });
434 throw e;
435 }
436 }
437
438
439 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
440 const _scope = _fileScope('subscriptionDeliveryClaim');
441 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
442
443 try {
444 const claims = await dbCtx.txIf(async (txCtx) => {
445 return txCtx.manyOrNone(this.statement.subscriptionDeliveryClaim, { claimant, wanted, claimTimeoutSeconds });
446 });
447 return claims.map((r) => r.id);
448 } catch (e) {
449 this.logger.error(_scope, 'failed', { error: e, claimant, wanted, claimTimeoutSeconds });
450 throw e;
451 }
452 }
453
454
455 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
456 const _scope = _fileScope('subscriptionDeliveryClaimById');
457 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
458
459 let result;
460 try {
461 result = await dbCtx.txIf(async (txCtx) => {
462 result = await txCtx.result(this.statement.subscriptionDeliveryClaimById, { claimant, subscriptionId, claimTimeoutSeconds });
463 if (result.rowCount != 1) {
464 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
465 }
466 return result;
467 });
468 return this._engineInfo(result);
469 } catch (e) {
470 this.logger.error(_scope, 'failed', { error: e, claimant, subscriptionId, claimTimeoutSeconds });
471 throw e;
472 }
473 }
474
475
476 async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
477 const _scope = _fileScope('subscriptionDeliveryComplete');
478 this.logger.debug(_scope, 'called', { callback, topicId });
479
480 let result;
481 try {
482 await dbCtx.txIf(async (txCtx) => {
483 result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
484 if (result.rowCount != 1) {
485 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
486 }
487 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
488 if (result.rowCount != 1) {
489 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
490 }
491 });
492 } catch (e) {
493 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
494 throw e;
495 }
496 }
497
498
499 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
500 const _scope = _fileScope('subscriptionDeliveryGone');
501 this.logger.debug(_scope, 'called', { callback, topicId });
502
503 let result;
504 try {
505 await dbCtx.txIf(async (txCtx) => {
506 result = await txCtx.result(this.statement.subscriptionDelete, { callback, topicId });
507 if (result.rowCount != 1) {
508 throw new DBErrors.UnexpectedResult('did not delete subscription');
509 }
510 // Delete cascades to delivery
511 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
512 // if (result.rowCount != 1) {
513 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
514 // }
515 });
516 } catch (e) {
517 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
518 throw e;
519 }
520 }
521
522
523 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
524 const _scope = _fileScope('subscriptionDeliveryIncomplete');
525 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
526
527 let result;
528 try {
529 await dbCtx.txIf(async (txCtx) => {
530 const { currentAttempt } = await txCtx.one(this.statement.subscriptionDeliveryAttempts, { callback, topicId });
531 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
532 result = await txCtx.result(this.statement.subscriptionDeliveryFailure, { nextAttemptDelaySeconds, callback, topicId });
533 if (result.rowCount != 1) {
534 throw new DBErrors.UnexpectedResult('did not set subscription delivery failure');
535 }
536 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
537 if (result.rowCount != 1) {
538 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
539 }
540 });
541 } catch (e) {
542 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
543 throw e;
544 }
545 }
546
547
548 async subscriptionGet(dbCtx, callback, topicId) {
549 const _scope = _fileScope('subscriptionGet');
550 this.logger.debug(_scope, 'called', { callback, topicId });
551
552 let subscription;
553 try {
554 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGet, { callback, topicId });
555 return subscription;
556 } catch (e) {
557 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
558 throw e;
559 }
560 }
561
562
563 async subscriptionGetById(dbCtx, subscriptionId) {
564 const _scope = _fileScope('subscriptionGetById');
565 this.logger.debug(_scope, 'called', { subscriptionId });
566
567 let subscription;
568 try {
569 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGetById, { subscriptionId });
570 return subscription;
571 } catch (e) {
572 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
573 throw e;
574 }
575 }
576
577
578 async subscriptionUpdate(dbCtx, data) {
579 const _scope = _fileScope('subscriptionUpdate');
580 this.logger.debug(_scope, 'called', { data });
581
582 const subscriptionData = {
583 ...data,
584 };
585
586 this._subscriptionUpdateDataValidate(subscriptionData);
587
588 let result;
589 try {
590 result = await dbCtx.result(this.statement.subscriptionUpdate, subscriptionData);
591 if (result.rowCount != 1) {
592 throw new DBErrors.UnexpectedResult('did not update subscription');
593 }
594 } catch (e) {
595 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
596 throw e;
597 }
598 }
599
600
601 async subscriptionUpsert(dbCtx, data) {
602 const _scope = _fileScope('subscriptionUpsert');
603 this.logger.debug(_scope, 'called', { ...data });
604
605 const subscriptionData = {
606 secret: null,
607 httpRemoteAddr: null,
608 httpFrom: null,
609 ...data,
610 };
611 this._subscriptionUpsertDataValidate(subscriptionData);
612
613 let result;
614 try {
615 result = await dbCtx.result(this.statement.subscriptionUpsert, subscriptionData);
616 if (result.rowCount != 1) {
617 throw new DBErrors.UnexpectedResult('did not upsert subscription');
618 }
619 return this._engineInfo(result);
620 } catch (e) {
621 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
622 throw e;
623 }
624 }
625
626
627 async topicDeleted(dbCtx, topicId) {
628 const _scope = _fileScope('topicDeleted');
629 this.logger.debug(_scope, 'called', { topicId });
630
631 let result;
632 try {
633 result = await dbCtx.result(this.statement.topicDeleted, { topicId });
634 if (result.rowCount != 1) {
635 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
636 }
637 } catch (e) {
638 this.logger.error(_scope, 'failed to update topic as deleted', { error: e, topicId });
639 throw e;
640 }
641 }
642
643
644 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
645 const _scope = _fileScope('topicFetchClaim');
646 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
647
648 let claims;
649 try {
650 await dbCtx.txIf(async (txCtx) => {
651 claims = await txCtx.manyOrNone(this.statement.topicContentFetchClaim, { claimant, wanted, claimTimeoutSeconds });
652 });
653 return claims.map((r) => r.id);
654 } catch (e) {
655 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e });
656 throw e;
657 }
658 }
659
660
661 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
662 const _scope = _fileScope('topicFetchClaimById');
663 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
664
665 let result;
666 try {
667 await dbCtx.txIf(async (txCtx) => {
668 result = await txCtx.result(this.statement.topicContentFetchClaimById, { topicId, claimant, claimTimeoutSeconds });
669 });
670 return this._engineInfo(result);
671 } catch (e) {
672 this.logger.error(_scope, 'failed', { error: e, topicId });
673 throw e;
674 }
675 }
676
677
678 async topicFetchComplete(dbCtx, topicId) {
679 const _scope = _fileScope('topicFetchComplete');
680 this.logger.debug(_scope, 'called', { topicId });
681
682 let result;
683 try {
684 await dbCtx.txIf(async (txCtx) => {
685 result = await txCtx.result(this.statement.topicAttemptsReset, { topicId });
686 if (result.rowCount != 1) {
687 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
688 }
689 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
690 if (result.rowCount != 1) {
691 throw new DBErrors.UnexpectedResult('did not release topic fetch');
692 }
693 });
694 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
695 return this._engineInfo(result);
696 } catch (e) {
697 this.logger.error(_scope, 'failed', { error: e, result, topicId });
698 throw e;
699 }
700 }
701
702
703 async topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
704 const _scope = _fileScope('topicFetchIncomplete');
705 this.logger.debug(_scope, 'called', { topicId });
706
707 let result;
708 try {
709 result = await dbCtx.txIf(async (txCtx) => {
710 const { contentFetchAttemptsSinceSuccess: currentAttempt } = await txCtx.one(this.statement.topicAttempts, { topicId });
711 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
712 result = await txCtx.result(this.statement.topicAttemptsIncrement, { topicId, nextAttemptDelaySeconds });
713 if (result.rowCount != 1) {
714 throw new DBErrors.UnexpectedResult('did not set topic attempts');
715 }
716 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
717 if (result.rowCount != 1) {
718 throw new DBErrors.UnexpectedResult('did not release topic fetch');
719 }
720 return result;
721 });
722 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
723 return this._engineInfo(result);
724 } catch (e) {
725 this.logger.error(_scope, 'failed', { error: e, result, topicId });
726 throw e;
727 }
728 }
729
730
731 async topicFetchRequested(dbCtx, topicId) {
732 const _scope = _fileScope('topicFetchRequested');
733 this.logger.debug(_scope, 'called', { topicId });
734
735 let result;
736 try {
737 result = await dbCtx.result(this.statement.topicContentFetchRequested, { topicId });
738 if (result.rowCount != 1) {
739 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
740 }
741 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
742 return this._engineInfo(result);
743 } catch (e) {
744 this.logger.error(_scope, 'failed', { error: e, topicId });
745 throw e;
746 }
747 }
748
749
750 async topicGetAll(dbCtx) {
751 const _scope = _fileScope('topicGetAll');
752 this.logger.debug(_scope, 'called');
753
754 let topics;
755 try {
756 topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
757 } catch (e) {
758 this.logger.error(_scope, 'failed', { error: e, topics });
759 throw e;
760 }
761 if (topics) {
762 topics = topics.map(this._topicDefaults.bind(this));
763 }
764 return topics;
765 }
766
767
768 async topicGetById(dbCtx, topicId, applyDefaults = true) {
769 const _scope = _fileScope('topicGetById');
770 this.logger.debug(_scope, 'called', { topicId });
771
772 let topic;
773 try {
774 topic = await dbCtx.oneOrNone(this.statement.topicGetById, { topicId });
775 if (applyDefaults) {
776 topic = this._topicDefaults(topic);
777 }
778 return topic;
779 } catch (e) {
780 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
781 throw e;
782 }
783 }
784
785
786 async topicGetByUrl(dbCtx, topicUrl) {
787 const _scope = _fileScope('topicGetByUrl');
788 this.logger.debug(_scope, 'called', { topicUrl });
789
790 let topic;
791 try {
792 topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
793 return this._topicDefaults(topic);
794 } catch (e) {
795 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
796 throw e;
797 }
798 }
799
800
801 async topicGetContentById(dbCtx, topicId) {
802 const _scope = _fileScope('topicGetContentById');
803 this.logger.debug(_scope, 'called', { topicId });
804
805 let topic;
806 try {
807 topic = this._cacheGet(topicId);
808 if (topic) {
809 return topic;
810 }
811 topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
812 const topicWithDefaults = this._topicDefaults(topic);
813 this._cacheSet(topicId, topicWithDefaults);
814 return topicWithDefaults;
815 } catch (e) {
816 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
817 throw e;
818 }
819 }
820
821
822 async topicPendingDelete(dbCtx, topicId) {
823 const _scope = _fileScope('topicPendingDelete');
824 this.logger.debug(_scope, 'called', { topicId });
825
826 try {
827 await dbCtx.txIf(async (txCtx) => {
828 const topic = await txCtx.one(this.statement.topicGetById, { topicId });
829 if (!topic.isDeleted) {
830 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
831 return;
832 }
833
834 const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
835 if (subscriberCount) {
836 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
837 return;
838 }
839
840 const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
841 if (result.rowCount !== 1) {
842 throw new DBErrors.UnexpectedResult('did not delete topic');
843 }
844 });
845 this.logger.debug(_scope, 'success', { topicId });
846 } catch (e) {
847 this.logger.error(_scope, 'failed', { error: e, topicId });
848 throw e;
849 }
850 }
851
852
853 async topicSet(dbCtx, data) {
854 const _scope = _fileScope('topicSet');
855 this.logger.debug(_scope, 'called', data);
856
857 const topicSetData = {
858 publisherValidationUrl: null,
859 leaseSecondsPreferred: null,
860 leaseSecondsMin: null,
861 leaseSecondsMax: null,
862 ...data,
863 };
864
865 let result;
866 try {
867 this._topicSetDataValidate(topicSetData);
868 result = await dbCtx.result(this.statement.topicUpsert, topicSetData);
869 if (result.rowCount != 1) {
870 throw new DBErrors.UnexpectedResult('did not set topic data');
871 }
872 this.logger.debug(_scope, 'success', { topicSetData, ...this._resultLog(result) });
873 return this._engineInfo(result);
874 } catch (e) {
875 this.logger.error(_scope, 'failed', { error: e, result });
876 throw e;
877 }
878 }
879
880
881 async topicSetContent(dbCtx, data) {
882 const _scope = _fileScope('topicSetContent');
883 const topicSetContentData = {
884 contentType: null,
885 ...data,
886 };
887 const logData = {
888 ...topicSetContentData,
889 content: common.logTruncate(topicSetContentData.content, 100),
890 };
891 this.logger.debug(_scope, 'called', data);
892
893 let result;
894 try {
895 this._topicSetContentDataValidate(topicSetContentData);
896 result = await dbCtx.result(this.statement.topicSetContent, topicSetContentData);
897 logData.result = this._resultLog(result);
898 if (result.rowCount != 1) {
899 throw new DBErrors.UnexpectedResult('did not set topic content');
900 }
901 this.logger.debug(_scope, 'success', { ...logData });
902 return this._engineInfo(result);
903 } catch (e) {
904 this.logger.error(_scope, 'failed', { error: e, ...logData });
905 throw e;
906 }
907 }
908
909
910 async topicUpdate(dbCtx, data) {
911 const _scope = _fileScope('topicUpdate');
912 this.logger.debug(_scope, 'called', { data });
913
914 const topicData = {
915 leaseSecondsPreferred: null,
916 leaseSecondsMin: null,
917 leaseSecondsMax: null,
918 publisherValidationUrl: null,
919 ...data,
920 };
921
922 this._topicUpdateDataValidate(topicData);
923
924 let result;
925 try {
926 result = await dbCtx.result(this.statement.topicUpdate, topicData);
927 if (result.rowCount != 1) {
928 throw new DBErrors.UnexpectedResult('did not update topic');
929 }
930 } catch (e) {
931 this.logger.error(_scope, 'failed', { error: e, topicData });
932 throw e;
933 }
934 }
935
936
937 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
938 const _scope = _fileScope('verificationClaim');
939 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
940
941 let result;
942 try {
943 await dbCtx.txIf(async (txCtx) => {
944 result = await txCtx.manyOrNone(this.statement.verificationClaim, { claimant, wanted, claimTimeoutSeconds });
945 });
946 return result.map((r) => r.id);
947 } catch (e) {
948 this.logger.error(_scope, 'failed', { wanted, claimTimeoutSeconds });
949 throw e;
950 }
951 }
952
953
954
955 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
956 const _scope = _fileScope('verificationClaimById');
957 this.logger.debug(_scope, 'called', { verificationId, claimant, claimTimeoutSeconds });
958
959 let result;
960 try {
961 await dbCtx.txIf(async (txCtx) => {
962 result = await txCtx.result(this.statement.verificationClaimById, { verificationId, claimant, claimTimeoutSeconds });
963 });
964 return this._engineInfo(result);
965 } catch (e) {
966 this.logger.error(_scope, 'failed', { verificationId, claimant, claimTimeoutSeconds });
967 throw e;
968 }
969 }
970
971
972 async verificationComplete(dbCtx, verificationId, callback, topicId) {
973 const _scope = _fileScope('verificationComplete');
974 this.logger.debug(_scope, 'called', { verificationId });
975
976 let result;
977 try {
978 await dbCtx.txIf(async (txCtx) => {
979 result = await txCtx.result(this.statement.verificationScrub, { verificationId, callback, topicId });
980 if (result.rowCount < 1) {
981 throw new DBErrors.UnexpectedResult('did not remove verifications');
982 }
983 });
984 } catch (e) {
985 this.logger.error(_scope, 'failed', { verificationId });
986 throw e;
987 }
988 return this._engineInfo(result);
989 }
990
991
992 async verificationGetById(dbCtx, verificationId) {
993 const _scope = _fileScope('verificationGetById');
994 this.logger.debug(_scope, 'called', { verificationId });
995
996 let verification;
997 try {
998 verification = await dbCtx.oneOrNone(this.statement.verificationGetById, { verificationId });
999 return verification;
1000 } catch (e) {
1001 this.logger.error(_scope, 'failed', { error: e, verificationId });
1002 throw e;
1003 }
1004 }
1005
1006
1007 async verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
1008 const _scope = _fileScope('verificationIncomplete');
1009 this.logger.debug(_scope, 'called', { verificationId });
1010
1011 let result;
1012 try {
1013 await dbCtx.txIf(async (txCtx) => {
1014 const { attempts } = await txCtx.one(this.statement.verificationAttempts, { verificationId });
1015 const nextAttemptDelaySeconds = common.attemptRetrySeconds(attempts, retryDelays);
1016 result = await txCtx.result(this.statement.verificationAttemptIncrement, { verificationId, nextAttemptDelaySeconds });
1017 if (result.rowCount != 1) {
1018 throw new DBErrors.UnexpectedResult('did not update verification attempts');
1019 }
1020 result = await txCtx.result(this.statement.verificationDone, { verificationId });
1021 if (result.rowCount != 1) {
1022 throw new DBErrors.UnexpectedResult('did not release verification');
1023 }
1024 });
1025 } catch (e) {
1026 this.logger.error(_scope, 'failed', { error: e, verificationId });
1027 throw e;
1028 }
1029 }
1030
1031
1032 async verificationInsert(dbCtx, verification) {
1033 const _scope = _fileScope('verificationInsert');
1034 this.logger.debug(_scope, 'called', { verification });
1035
1036 const verificationData = {
1037 secret: null,
1038 httpRemoteAddr: null,
1039 httpFrom: null,
1040 requestId: null,
1041 ...verification,
1042 };
1043
1044 let result, verificationId;
1045 try {
1046 this._verificationDataValidate(verificationData);
1047 result = await dbCtx.result(this.statement.verificationInsert, verificationData);
1048 if (result.rowCount != 1) {
1049 throw new DBErrors.UnexpectedResult('did not insert verification');
1050 }
1051 verificationId = result.rows[0].id;
1052 this.logger.debug(_scope, 'inserted verification', { verificationId });
1053
1054 return verificationId;
1055 } catch (e) {
1056 this.logger.error(_scope, 'failed', { error: e, verificationData });
1057 throw e;
1058 }
1059 }
1060
1061
1062 async verificationRelease(dbCtx, verificationId) {
1063 const _scope = _fileScope('verificationRelease');
1064 this.logger.debug(_scope, 'called', { verificationId });
1065
1066 let result;
1067 try {
1068 result = await dbCtx.result(this.statement.verificationDone, { verificationId });
1069 if (result.rowCount != 1) {
1070 throw new DBErrors.UnexpectedResult('did not release verification');
1071 }
1072 return this._engineInfo(result);
1073 } catch (e) {
1074 this.logger.error(_scope, 'failed', { error: e, verificationId });
1075 throw e;
1076 }
1077 }
1078
1079
1080 async verificationUpdate(dbCtx, verificationId, data) {
1081 const _scope = _fileScope('verificationUpdate');
1082 this.logger.debug(_scope, 'called', { verificationId, data });
1083
1084 const verificationData = {
1085 reason: null,
1086 verificationId,
1087 ...data,
1088 };
1089
1090 let result;
1091 try {
1092 this._verificationUpdateDataValidate(verificationData);
1093 result = await dbCtx.result(this.statement.verificationUpdate, verificationData);
1094 if (result.rowCount != 1) {
1095 throw new DBErrors.UnexpectedResult('did not update verification');
1096 }
1097 } catch (e) {
1098 this.logger.error(_scope, 'failed', { error: e, verificationData });
1099 throw e;
1100 }
1101 }
1102
1103
1104 async verificationValidated(dbCtx, verificationId) {
1105 const _scope = _fileScope('verificationValidated');
1106 this.logger.debug(_scope, 'called', { verificationId });
1107
1108 let result;
1109 try {
1110 result = await dbCtx.result(this.statement.verificationValidate, { verificationId });
1111 if (result.rowCount != 1) {
1112 throw new DBErrors.UnexpectedResult('did not set verification validation');
1113 }
1114 } catch (e) {
1115 this.logger.error(_scope, 'failed', { error: e, verificationId });
1116 throw e;
1117 }
1118 }
1119
1120 }
1121
1122 module.exports = DatabasePostgres;