update dependencies and devDependencies, fix lint issues
[websub-hub] / src / db / postgres / index.js
1 /* eslint-disable security/detect-object-injection */
2 'use strict';
3
4 const pgpInitOptions = {
5 capSQL: true,
6 };
7
8 const path = require('path');
9 const pgp = require('pg-promise')(pgpInitOptions);
10 const svh = require('../schema-version-helper');
11 const Database = require('../base');
12 const DBErrors = require('../errors');
13 const Listener = require('./listener');
14 const common = require('../../common');
15
16 const _fileScope = common.fileScope(__filename);
17
18 const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
19 const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
20 pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
21 const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
22 pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
23
24 const schemaVersionsSupported = {
25 min: {
26 major: 1,
27 minor: 0,
28 patch: 0,
29 },
30 max: {
31 major: 1,
32 minor: 0,
33 patch: 4,
34 },
35 };
36
37 class DatabasePostgres extends Database {
38 constructor(logger, options, _pgp = pgp) {
39 super(logger, options);
40
41 this.db = _pgp(options.db.connectionString);
42 this.schemaVersionsSupported = schemaVersionsSupported;
43
44 // Suppress QF warnings when running tests
45 this.noWarnings = options.db.noWarnings;
46
47 if (options.db.cacheEnabled) {
48 this.listener = new Listener(logger, this.db, {
49 ...options.db.listener,
50 channel: 'topic_changed',
51 dataCallback: this._topicChanged.bind(this),
52 connectionEstablishedCallback: this._listenerEstablished.bind(this),
53 connectionLostCallback: this._listenerLost.bind(this),
54 });
55 }
56
57 // Log queries
58 const queryLogLevel = options.db.queryLogLevel;
59 if (queryLogLevel) {
60 pgpInitOptions.query = (event) => {
61 // Quell outgoing pings
62 if (event?.query?.startsWith('NOTIFY')) {
63 return;
64 }
65 this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event || {}, ['query', 'params']) });
66 };
67 }
68
69 // Log errors
70 pgpInitOptions.error = (err, event) => {
71 this.logger.error(_fileScope('pgp:error'), '', { err, event });
72
73 // TODO: close connection on err.code === '57P03' database shutting down
74 };
75
76 // Deophidiate column names in-place, log results
77 pgpInitOptions.receive = ({ data, result, ctx: event }) => {
78 const exemplaryRow = data[0];
79 for (const prop in exemplaryRow) {
80 const camel = Database._camelfy(prop);
81 if (!(camel in exemplaryRow)) {
82 for (const d of data) {
83 d[camel] = d[prop];
84 delete d[prop];
85 }
86 }
87 }
88 if (queryLogLevel) {
89 // Quell outgoing pings
90 if (result && result.command === 'NOTIFY') {
91 return;
92 }
93 // Omitting .rows
94 const resultLog = common.pick(result || {}, ['command', 'rowCount', 'duration']);
95 this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
96 }
97 };
98
99 // Expose these for test coverage
100 this.pgpInitOptions = pgpInitOptions;
101 this._pgp = _pgp;
102
103 this._initStatements(_pgp);
104 }
105
106
107 _queryFileHelper(_pgp) {
108 return (file) => {
109 const _scope = _fileScope('_queryFile');
110 /* istanbul ignore next */
111 const qfParams = {
112 minify: true,
113 ...(this.noWarnings && { noWarnings: this.noWarnings }),
114 };
115 const qf = new _pgp.QueryFile(file, qfParams);
116 if (qf.error) {
117 this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
118 throw qf.error;
119 }
120 return qf;
121 };
122 }
123
124
125 async initialize(applyMigrations = true) {
126 const _scope = _fileScope('initialize');
127 this.logger.debug(_scope, 'called', { applyMigrations });
128 if (applyMigrations) {
129 await this._initTables();
130 }
131 await super.initialize();
132 if (this.listener) {
133 await this.listener.start();
134 }
135 }
136
137
138 async _initTables(_pgp) {
139 const _scope = _fileScope('_initTables');
140 this.logger.debug(_scope, 'called', {});
141
142 const _queryFile = this._queryFileHelper(_pgp || this._pgp);
143
144 // Migrations rely upon this table, ensure it exists.
145 const metaVersionTable = '_meta_schema_version';
146
147 const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
148 let metaExists = await tableExists(metaVersionTable);
149 if (!metaExists) {
150 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
151 const initSql = _queryFile(fPath);
152 const results = await this.db.multiResult(initSql);
153 this.logger.debug(_scope, 'executed init sql', { results });
154 metaExists = await tableExists(metaVersionTable);
155 /* istanbul ignore if */
156 if (!metaExists) {
157 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
158 }
159 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
160 }
161
162 // Apply migrations
163 const currentSchema = await this._currentSchema();
164 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
165 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
166 for (const v of migrationsWanted) {
167 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
168 try {
169 const migrationSql = _queryFile(fPath);
170 this.logger.debug(_scope, 'applying migration', { version: v });
171 const results = await this.db.multiResult(migrationSql);
172 this.logger.debug(_scope, 'migration results', { results });
173 this.logger.info(_scope, 'applied migration', { version: v });
174 } catch (e) {
175 this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
176 throw e;
177 }
178 }
179 }
180
181
182 _initStatements(_pgp) {
183 const _scope = _fileScope('_initStatements');
184 const _queryFile = this._queryFileHelper(_pgp);
185 this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
186 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
187 }
188
189
190 async healthCheck() {
191 const _scope = _fileScope('healthCheck');
192 this.logger.debug(_scope, 'called', {});
193 const c = await this.db.connect();
194 c.done();
195 return { serverVersion: c.client.serverVersion };
196 }
197
198
199 async _currentSchema() {
200 return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
201 }
202
203
204 async _closeConnection() {
205 const _scope = _fileScope('_closeConnection');
206 try {
207 if (this.listener) {
208 await this.listener.stop();
209 }
210 await this._pgp.end();
211 } catch (e) {
212 this.logger.error(_scope, 'failed', { error: e });
213 throw e;
214 }
215 }
216
217
218 /* istanbul ignore next */
219 async _purgeTables(really = false) {
220 const _scope = _fileScope('_purgeTables');
221 try {
222 if (really) {
223 await this.db.tx(async (t) => {
224 await t.batch([
225 'topic',
226 // 'topic_fetch_in_progress',
227 // 'verification',
228 // 'verification_in_progress',
229 // 'subscription',
230 // 'subscription_delivery_in_progress',
231 ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
232 });
233 }
234 } catch (e) {
235 this.logger.error(_scope, 'failed', { error: e });
236 throw e;
237 }
238 }
239
240
241 // eslint-disable-next-line class-methods-use-this
242 _engineInfo(result) {
243 return {
244 changes: result.rowCount,
245 lastInsertRowid: result.rows.length ? result.rows[0].id : undefined,
246 duration: result.duration,
247 };
248 }
249
250
251 // eslint-disable-next-line class-methods-use-this
252 _resultLog(result) {
253 return common.pick(result, ['command', 'rowCount', 'duration']);
254 }
255
256
257 /**
258 * Receive notices when topic entry is updated.
259 * Clear relevant cache entry.
260 * @param {string} payload topic changed event
261 */
262 _topicChanged(payload) {
263 const _scope = _fileScope('_topicChanged');
264 if (payload !== 'ping') {
265 this.logger.debug(_scope, 'called', { payload });
266 this.cache.delete(payload);
267 }
268 }
269
270
271 /**
272 * Called when a listener connection is opened.
273 * Enable cache.
274 */
275 _listenerEstablished() {
276 const _scope = _fileScope('_listenerEstablished');
277 this.logger.debug(_scope, 'called', {});
278 this.cache = new Map();
279 }
280
281
282 /**
283 * Called when a listener connection is closed.
284 * Disable cache.
285 */
286 _listenerLost() {
287 const _scope = _fileScope('_listenerLost');
288 this.logger.debug(_scope, 'called', {});
289 delete this.cache;
290 }
291
292
293 /**
294 * Return a cached entry, if available.
295 * @param {*} key key
296 * @returns {object=} cached data
297 */
298 _cacheGet(key) {
299 const _scope = _fileScope('_cacheGet');
300 if (this.cache?.has(key)) {
301 const cacheEntry = this.cache.get(key);
302 this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
303 cacheEntry.hits += 1;
304 cacheEntry.lastHit = new Date();
305 return cacheEntry.data;
306 }
307 }
308
309
310 /**
311 * Store an entry in cache, if available.
312 * @param {*} key key
313 * @param {*} data data
314 */
315 _cacheSet(key, data) {
316 const _scope = _fileScope('_cacheSet');
317 if (this.cache) {
318 this.cache.set(key, {
319 added: new Date(),
320 hits: 0,
321 lastHit: undefined,
322 data,
323 });
324 this.logger.debug(_scope, 'added cache entry', { key });
325 }
326 }
327
328
329 async context(fn) {
330 return this.db.task(async (t) => fn(t));
331 }
332
333
334 // eslint-disable-next-line class-methods-use-this
335 async transaction(dbCtx, fn) {
336 return dbCtx.txIf(async (t) => fn(t));
337 }
338
339
340 async authenticationSuccess(dbCtx, identifier) {
341 const _scope = _fileScope('authenticationSuccess');
342 this.logger.debug(_scope, 'called', { identifier });
343
344 let result;
345 try {
346 result = await dbCtx.result(this.statement.authenticationSuccess, { identifier });
347 if (result.rowCount != 1) {
348 throw new DBErrors.UnexpectedResult('did not update authentication success event');
349 }
350 } catch (e) {
351 this.logger.error(_scope, 'failed', { error: e, identifier });
352 throw e;
353 }
354 }
355
356
357 async authenticationGet(dbCtx, identifier) {
358 const _scope = _fileScope('authenticationGet');
359 this.logger.debug(_scope, 'called', { identifier });
360
361 let auth;
362 try {
363 auth = await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier });
364 return auth;
365 } catch (e) {
366 this.logger.error(_scope, 'failed', { error: e, identifier });
367 throw e;
368 }
369 }
370
371
372 async authenticationUpsert(dbCtx, identifier, credential) {
373 const _scope = _fileScope('authenticationUpsert');
374 const scrubbedCredential = '*'.repeat((credential || '').length);
375 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
376
377 let result;
378 try {
379 result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
380 if (result.rowCount != 1) {
381 throw new DBErrors.UnexpectedResult('did not upsert authentication');
382 }
383 } catch (e) {
384 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential });
385 throw e;
386 }
387 }
388
389
390 async subscriptionsByTopicId(dbCtx, topicId) {
391 const _scope = _fileScope('subscriptionsByTopicId');
392 this.logger.debug(_scope, 'called', { topicId });
393
394 let count;
395 try {
396 count = await dbCtx.manyOrNone(this.statement.subscriptionsByTopicId, { topicId });
397 return count;
398 } catch (e) {
399 this.logger.error(_scope, 'failed', { error: e, topicId });
400 throw e;
401 }
402 }
403
404
405 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
406 const _scope = _fileScope('subscriptionCountByTopicUrl');
407 this.logger.debug(_scope, 'called', { topicUrl });
408
409 let count;
410 try {
411 count = await dbCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl });
412 return count;
413 } catch (e) {
414 this.logger.error(_scope, 'failed', { error: e, topicUrl });
415 throw e;
416 }
417 }
418
419
420 async subscriptionDelete(dbCtx, callback, topicId) {
421 const _scope = _fileScope('subscriptionDelete');
422 this.logger.debug(_scope, 'called', { callback, topicId });
423
424 try {
425 const result = await dbCtx.result(this.statement.subscriptionDelete, { callback, topicId });
426 return this._engineInfo(result);
427 } catch (e) {
428 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
429 throw e;
430 }
431 }
432
433
434 async subscriptionDeleteExpired(dbCtx, topicId) {
435 const _scope = _fileScope('subscriptionDeleteExpired');
436 this.logger.debug(_scope, 'called', { topicId });
437
438 try {
439 const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
440 this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
441 return this._engineInfo(result);
442 } catch (e) {
443 this.logger.error(_scope, 'failed', { error: e, topicId });
444 throw e;
445 }
446 }
447
448
449 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
450 const _scope = _fileScope('subscriptionDeliveryClaim');
451 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
452
453 try {
454 const claims = await dbCtx.txIf(async (txCtx) => {
455 return txCtx.manyOrNone(this.statement.subscriptionDeliveryClaim, { claimant, wanted, claimTimeoutSeconds });
456 });
457 return claims.map((r) => r.id);
458 } catch (e) {
459 this.logger.error(_scope, 'failed', { error: e, claimant, wanted, claimTimeoutSeconds });
460 throw e;
461 }
462 }
463
464
465 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
466 const _scope = _fileScope('subscriptionDeliveryClaimById');
467 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
468
469 let result;
470 try {
471 result = await dbCtx.txIf(async (txCtx) => {
472 result = await txCtx.result(this.statement.subscriptionDeliveryClaimById, { claimant, subscriptionId, claimTimeoutSeconds });
473 if (result.rowCount != 1) {
474 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
475 }
476 return result;
477 });
478 return this._engineInfo(result);
479 } catch (e) {
480 this.logger.error(_scope, 'failed', { error: e, claimant, subscriptionId, claimTimeoutSeconds });
481 throw e;
482 }
483 }
484
485
486 async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
487 const _scope = _fileScope('subscriptionDeliveryComplete');
488 this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
489
490 let result;
491 try {
492 await dbCtx.txIf(async (txCtx) => {
493 result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
494 if (result.rowCount != 1) {
495 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
496 }
497 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
498 if (result.rowCount != 1) {
499 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
500 }
501 });
502 } catch (e) {
503 this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
504 throw e;
505 }
506 }
507
508
509 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
510 const _scope = _fileScope('subscriptionDeliveryGone');
511 this.logger.debug(_scope, 'called', { callback, topicId });
512
513 let result;
514 try {
515 await dbCtx.txIf(async (txCtx) => {
516 result = await txCtx.result(this.statement.subscriptionDelete, { callback, topicId });
517 if (result.rowCount != 1) {
518 throw new DBErrors.UnexpectedResult('did not delete subscription');
519 }
520 // Delete cascades to delivery
521 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
522 // if (result.rowCount != 1) {
523 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
524 // }
525 });
526 } catch (e) {
527 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
528 throw e;
529 }
530 }
531
532
533 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
534 const _scope = _fileScope('subscriptionDeliveryIncomplete');
535 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
536
537 let result;
538 try {
539 await dbCtx.txIf(async (txCtx) => {
540 const { currentAttempt } = await txCtx.one(this.statement.subscriptionDeliveryAttempts, { callback, topicId });
541 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
542 result = await txCtx.result(this.statement.subscriptionDeliveryFailure, { nextAttemptDelaySeconds, callback, topicId });
543 if (result.rowCount != 1) {
544 throw new DBErrors.UnexpectedResult('did not set subscription delivery failure');
545 }
546 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
547 if (result.rowCount != 1) {
548 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
549 }
550 });
551 } catch (e) {
552 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
553 throw e;
554 }
555 }
556
557
558 async subscriptionGet(dbCtx, callback, topicId) {
559 const _scope = _fileScope('subscriptionGet');
560 this.logger.debug(_scope, 'called', { callback, topicId });
561
562 let subscription;
563 try {
564 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGet, { callback, topicId });
565 return subscription;
566 } catch (e) {
567 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
568 throw e;
569 }
570 }
571
572
573 async subscriptionGetById(dbCtx, subscriptionId) {
574 const _scope = _fileScope('subscriptionGetById');
575 this.logger.debug(_scope, 'called', { subscriptionId });
576
577 let subscription;
578 try {
579 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGetById, { subscriptionId });
580 return subscription;
581 } catch (e) {
582 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
583 throw e;
584 }
585 }
586
587
588 async subscriptionUpdate(dbCtx, data) {
589 const _scope = _fileScope('subscriptionUpdate');
590 this.logger.debug(_scope, 'called', { data });
591
592 const subscriptionData = {
593 ...data,
594 };
595
596 this._subscriptionUpdateDataValidate(subscriptionData);
597
598 let result;
599 try {
600 result = await dbCtx.result(this.statement.subscriptionUpdate, subscriptionData);
601 if (result.rowCount != 1) {
602 throw new DBErrors.UnexpectedResult('did not update subscription');
603 }
604 } catch (e) {
605 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
606 throw e;
607 }
608 }
609
610
611 async subscriptionUpsert(dbCtx, data) {
612 const _scope = _fileScope('subscriptionUpsert');
613 this.logger.debug(_scope, 'called', { ...data });
614
615 const subscriptionData = {
616 secret: null,
617 httpRemoteAddr: null,
618 httpFrom: null,
619 ...data,
620 };
621 this._subscriptionUpsertDataValidate(subscriptionData);
622
623 let result;
624 try {
625 result = await dbCtx.result(this.statement.subscriptionUpsert, subscriptionData);
626 if (result.rowCount != 1) {
627 throw new DBErrors.UnexpectedResult('did not upsert subscription');
628 }
629 return this._engineInfo(result);
630 } catch (e) {
631 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
632 throw e;
633 }
634 }
635
636
637 async topicDeleted(dbCtx, topicId) {
638 const _scope = _fileScope('topicDeleted');
639 this.logger.debug(_scope, 'called', { topicId });
640
641 let result;
642 try {
643 result = await dbCtx.result(this.statement.topicDeleted, { topicId });
644 if (result.rowCount != 1) {
645 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
646 }
647 } catch (e) {
648 this.logger.error(_scope, 'failed to update topic as deleted', { error: e, topicId });
649 throw e;
650 }
651 }
652
653
654 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
655 const _scope = _fileScope('topicFetchClaim');
656 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
657
658 let claims;
659 try {
660 await dbCtx.txIf(async (txCtx) => {
661 claims = await txCtx.manyOrNone(this.statement.topicContentFetchClaim, { claimant, wanted, claimTimeoutSeconds });
662 });
663 return claims.map((r) => r.id);
664 } catch (e) {
665 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e });
666 throw e;
667 }
668 }
669
670
671 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
672 const _scope = _fileScope('topicFetchClaimById');
673 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
674
675 let result;
676 try {
677 await dbCtx.txIf(async (txCtx) => {
678 result = await txCtx.result(this.statement.topicContentFetchClaimById, { topicId, claimant, claimTimeoutSeconds });
679 });
680 return this._engineInfo(result);
681 } catch (e) {
682 this.logger.error(_scope, 'failed', { error: e, topicId });
683 throw e;
684 }
685 }
686
687
688 async topicFetchComplete(dbCtx, topicId) {
689 const _scope = _fileScope('topicFetchComplete');
690 this.logger.debug(_scope, 'called', { topicId });
691
692 let result;
693 try {
694 await dbCtx.txIf(async (txCtx) => {
695 result = await txCtx.result(this.statement.topicAttemptsReset, { topicId });
696 if (result.rowCount != 1) {
697 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
698 }
699 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
700 if (result.rowCount != 1) {
701 throw new DBErrors.UnexpectedResult('did not release topic fetch');
702 }
703 });
704 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
705 return this._engineInfo(result);
706 } catch (e) {
707 this.logger.error(_scope, 'failed', { error: e, result, topicId });
708 throw e;
709 }
710 }
711
712
713 async topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
714 const _scope = _fileScope('topicFetchIncomplete');
715 this.logger.debug(_scope, 'called', { topicId });
716
717 let result;
718 try {
719 result = await dbCtx.txIf(async (txCtx) => {
720 const { contentFetchAttemptsSinceSuccess: currentAttempt } = await txCtx.one(this.statement.topicAttempts, { topicId });
721 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
722 result = await txCtx.result(this.statement.topicAttemptsIncrement, { topicId, nextAttemptDelaySeconds });
723 if (result.rowCount != 1) {
724 throw new DBErrors.UnexpectedResult('did not set topic attempts');
725 }
726 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
727 if (result.rowCount != 1) {
728 throw new DBErrors.UnexpectedResult('did not release topic fetch');
729 }
730 return result;
731 });
732 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
733 return this._engineInfo(result);
734 } catch (e) {
735 this.logger.error(_scope, 'failed', { error: e, result, topicId });
736 throw e;
737 }
738 }
739
740
741 async topicFetchRequested(dbCtx, topicId) {
742 const _scope = _fileScope('topicFetchRequested');
743 this.logger.debug(_scope, 'called', { topicId });
744
745 let result;
746 try {
747 result = await dbCtx.result(this.statement.topicContentFetchRequested, { topicId });
748 if (result.rowCount != 1) {
749 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
750 }
751 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
752 return this._engineInfo(result);
753 } catch (e) {
754 this.logger.error(_scope, 'failed', { error: e, topicId });
755 throw e;
756 }
757 }
758
759
760 async topicGetAll(dbCtx) {
761 const _scope = _fileScope('topicGetAll');
762 this.logger.debug(_scope, 'called');
763
764 let topics;
765 try {
766 topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
767 } catch (e) {
768 this.logger.error(_scope, 'failed', { error: e, topics });
769 throw e;
770 }
771 if (topics) {
772 topics = topics.map(this._topicDefaults.bind(this));
773 }
774 return topics;
775 }
776
777
778 async topicGetById(dbCtx, topicId, applyDefaults = true) {
779 const _scope = _fileScope('topicGetById');
780 this.logger.debug(_scope, 'called', { topicId });
781
782 let topic;
783 try {
784 topic = await dbCtx.oneOrNone(this.statement.topicGetById, { topicId });
785 if (applyDefaults) {
786 topic = this._topicDefaults(topic);
787 }
788 return topic;
789 } catch (e) {
790 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
791 throw e;
792 }
793 }
794
795
796 async topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
797 const _scope = _fileScope('topicGetByUrl');
798 this.logger.debug(_scope, 'called', { topicUrl });
799
800 let topic;
801 try {
802 topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
803 if (applyDefaults) {
804 topic = this._topicDefaults(topic);
805 }
806 return topic;
807 } catch (e) {
808 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
809 throw e;
810 }
811 }
812
813
814 async topicGetContentById(dbCtx, topicId) {
815 const _scope = _fileScope('topicGetContentById');
816 this.logger.debug(_scope, 'called', { topicId });
817
818 let topic;
819 try {
820 topic = this._cacheGet(topicId);
821 if (topic) {
822 return topic;
823 }
824 topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
825 const topicWithDefaults = this._topicDefaults(topic);
826 this._cacheSet(topicId, topicWithDefaults);
827 return topicWithDefaults;
828 } catch (e) {
829 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
830 throw e;
831 }
832 }
833
834
835 async topicPendingDelete(dbCtx, topicId) {
836 const _scope = _fileScope('topicPendingDelete');
837 this.logger.debug(_scope, 'called', { topicId });
838
839 try {
840 await dbCtx.txIf(async (txCtx) => {
841 const topic = await txCtx.one(this.statement.topicGetById, { topicId });
842 if (!topic.isDeleted) {
843 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
844 return;
845 }
846
847 const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
848 if (subscriberCount) {
849 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
850 return;
851 }
852
853 const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
854 if (result.rowCount !== 1) {
855 throw new DBErrors.UnexpectedResult('did not delete topic');
856 }
857 });
858 this.logger.debug(_scope, 'success', { topicId });
859 } catch (e) {
860 this.logger.error(_scope, 'failed', { error: e, topicId });
861 throw e;
862 }
863 }
864
865
866 async topicPublishHistory(dbCtx, topicId, days) {
867 const _scope = _fileScope('topicPublishHistory');
868 this.logger.debug(_scope, 'called', { topicId, days });
869
870 const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days });
871 const history = Array.from({ length: days }, () => 0);
872 events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
873
874 return history;
875 }
876
877
878 async topicSet(dbCtx, data) {
879 const _scope = _fileScope('topicSet');
880 this.logger.debug(_scope, 'called', data);
881
882 const topicSetData = {
883 publisherValidationUrl: null,
884 leaseSecondsPreferred: null,
885 leaseSecondsMin: null,
886 leaseSecondsMax: null,
887 ...data,
888 };
889
890 let result;
891 try {
892 this._topicSetDataValidate(topicSetData);
893 result = await dbCtx.result(this.statement.topicUpsert, topicSetData);
894 if (result.rowCount != 1) {
895 throw new DBErrors.UnexpectedResult('did not set topic data');
896 }
897 this.logger.debug(_scope, 'success', { topicSetData, ...this._resultLog(result) });
898 return this._engineInfo(result);
899 } catch (e) {
900 this.logger.error(_scope, 'failed', { error: e, result });
901 throw e;
902 }
903 }
904
905
906 async topicSetContent(dbCtx, data) {
907 const _scope = _fileScope('topicSetContent');
908 const topicSetContentData = {
909 contentType: null,
910 httpETag: null,
911 httpLastModified: null,
912 ...data,
913 };
914 const logData = {
915 ...topicSetContentData,
916 content: common.logTruncate(topicSetContentData.content, 100),
917 };
918 this.logger.debug(_scope, 'called', data);
919
920 let result;
921 try {
922 this._topicSetContentDataValidate(topicSetContentData);
923 result = await dbCtx.result(this.statement.topicSetContent, topicSetContentData);
924 logData.result = this._resultLog(result);
925 if (result.rowCount != 1) {
926 throw new DBErrors.UnexpectedResult('did not set topic content');
927 }
928 result = await dbCtx.result(this.statement.topicSetContentHistory, {
929 topicId: data.topicId,
930 contentHash: data.contentHash,
931 contentSize: data.content.length,
932 });
933 if (result.rowCount != 1) {
934 throw new DBErrors.UnexpectedResult('did not set topic content history');
935 }
936 this.logger.debug(_scope, 'success', { ...logData });
937 return this._engineInfo(result);
938 } catch (e) {
939 this.logger.error(_scope, 'failed', { error: e, ...logData });
940 throw e;
941 }
942 }
943
944
945 async topicUpdate(dbCtx, data) {
946 const _scope = _fileScope('topicUpdate');
947 this.logger.debug(_scope, 'called', { data });
948
949 const topicData = {
950 leaseSecondsPreferred: null,
951 leaseSecondsMin: null,
952 leaseSecondsMax: null,
953 publisherValidationUrl: null,
954 ...data,
955 };
956
957 this._topicUpdateDataValidate(topicData);
958
959 let result;
960 try {
961 result = await dbCtx.result(this.statement.topicUpdate, topicData);
962 if (result.rowCount != 1) {
963 throw new DBErrors.UnexpectedResult('did not update topic');
964 }
965 } catch (e) {
966 this.logger.error(_scope, 'failed', { error: e, topicData });
967 throw e;
968 }
969 }
970
971
972 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
973 const _scope = _fileScope('verificationClaim');
974 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
975
976 let result;
977 try {
978 await dbCtx.txIf(async (txCtx) => {
979 result = await txCtx.manyOrNone(this.statement.verificationClaim, { claimant, wanted, claimTimeoutSeconds });
980 });
981 return result.map((r) => r.id);
982 } catch (e) {
983 this.logger.error(_scope, 'failed', { wanted, claimTimeoutSeconds });
984 throw e;
985 }
986 }
987
988
989
990 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
991 const _scope = _fileScope('verificationClaimById');
992 this.logger.debug(_scope, 'called', { verificationId, claimant, claimTimeoutSeconds });
993
994 let result;
995 try {
996 await dbCtx.txIf(async (txCtx) => {
997 result = await txCtx.result(this.statement.verificationClaimById, { verificationId, claimant, claimTimeoutSeconds });
998 });
999 return this._engineInfo(result);
1000 } catch (e) {
1001 this.logger.error(_scope, 'failed', { verificationId, claimant, claimTimeoutSeconds });
1002 throw e;
1003 }
1004 }
1005
1006
1007 async verificationComplete(dbCtx, verificationId, callback, topicId) {
1008 const _scope = _fileScope('verificationComplete');
1009 this.logger.debug(_scope, 'called', { verificationId });
1010
1011 let result;
1012 try {
1013 await dbCtx.txIf(async (txCtx) => {
1014 result = await txCtx.result(this.statement.verificationScrub, { verificationId, callback, topicId });
1015 if (result.rowCount < 1) {
1016 throw new DBErrors.UnexpectedResult('did not remove verifications');
1017 }
1018 });
1019 } catch (e) {
1020 this.logger.error(_scope, 'failed', { verificationId });
1021 throw e;
1022 }
1023 return this._engineInfo(result);
1024 }
1025
1026
1027 async verificationGetById(dbCtx, verificationId) {
1028 const _scope = _fileScope('verificationGetById');
1029 this.logger.debug(_scope, 'called', { verificationId });
1030
1031 let verification;
1032 try {
1033 verification = await dbCtx.oneOrNone(this.statement.verificationGetById, { verificationId });
1034 return verification;
1035 } catch (e) {
1036 this.logger.error(_scope, 'failed', { error: e, verificationId });
1037 throw e;
1038 }
1039 }
1040
1041
1042 async verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
1043 const _scope = _fileScope('verificationIncomplete');
1044 this.logger.debug(_scope, 'called', { verificationId });
1045
1046 let result;
1047 try {
1048 await dbCtx.txIf(async (txCtx) => {
1049 const { attempts } = await txCtx.one(this.statement.verificationAttempts, { verificationId });
1050 const nextAttemptDelaySeconds = common.attemptRetrySeconds(attempts, retryDelays);
1051 result = await txCtx.result(this.statement.verificationAttemptIncrement, { verificationId, nextAttemptDelaySeconds });
1052 if (result.rowCount != 1) {
1053 throw new DBErrors.UnexpectedResult('did not update verification attempts');
1054 }
1055 result = await txCtx.result(this.statement.verificationDone, { verificationId });
1056 if (result.rowCount != 1) {
1057 throw new DBErrors.UnexpectedResult('did not release verification');
1058 }
1059 });
1060 } catch (e) {
1061 this.logger.error(_scope, 'failed', { error: e, verificationId });
1062 throw e;
1063 }
1064 }
1065
1066
1067 async verificationInsert(dbCtx, verification) {
1068 const _scope = _fileScope('verificationInsert');
1069 this.logger.debug(_scope, 'called', { verification });
1070
1071 const verificationData = {
1072 secret: null,
1073 httpRemoteAddr: null,
1074 httpFrom: null,
1075 requestId: null,
1076 ...verification,
1077 };
1078
1079 let result, verificationId;
1080 try {
1081 this._verificationDataValidate(verificationData);
1082 result = await dbCtx.result(this.statement.verificationInsert, verificationData);
1083 if (result.rowCount != 1) {
1084 throw new DBErrors.UnexpectedResult('did not insert verification');
1085 }
1086 verificationId = result.rows[0].id;
1087 this.logger.debug(_scope, 'inserted verification', { verificationId });
1088
1089 return verificationId;
1090 } catch (e) {
1091 this.logger.error(_scope, 'failed', { error: e, verificationData });
1092 throw e;
1093 }
1094 }
1095
1096
1097 async verificationRelease(dbCtx, verificationId) {
1098 const _scope = _fileScope('verificationRelease');
1099 this.logger.debug(_scope, 'called', { verificationId });
1100
1101 let result;
1102 try {
1103 result = await dbCtx.result(this.statement.verificationDone, { verificationId });
1104 if (result.rowCount != 1) {
1105 throw new DBErrors.UnexpectedResult('did not release verification');
1106 }
1107 return this._engineInfo(result);
1108 } catch (e) {
1109 this.logger.error(_scope, 'failed', { error: e, verificationId });
1110 throw e;
1111 }
1112 }
1113
1114
1115 async verificationUpdate(dbCtx, verificationId, data) {
1116 const _scope = _fileScope('verificationUpdate');
1117 this.logger.debug(_scope, 'called', { verificationId, data });
1118
1119 const verificationData = {
1120 reason: null,
1121 verificationId,
1122 ...data,
1123 };
1124
1125 let result;
1126 try {
1127 this._verificationUpdateDataValidate(verificationData);
1128 result = await dbCtx.result(this.statement.verificationUpdate, verificationData);
1129 if (result.rowCount != 1) {
1130 throw new DBErrors.UnexpectedResult('did not update verification');
1131 }
1132 } catch (e) {
1133 this.logger.error(_scope, 'failed', { error: e, verificationData });
1134 throw e;
1135 }
1136 }
1137
1138
1139 async verificationValidated(dbCtx, verificationId) {
1140 const _scope = _fileScope('verificationValidated');
1141 this.logger.debug(_scope, 'called', { verificationId });
1142
1143 let result;
1144 try {
1145 result = await dbCtx.result(this.statement.verificationValidate, { verificationId });
1146 if (result.rowCount != 1) {
1147 throw new DBErrors.UnexpectedResult('did not set verification validation');
1148 }
1149 } catch (e) {
1150 this.logger.error(_scope, 'failed', { error: e, verificationId });
1151 throw e;
1152 }
1153 }
1154
1155 }
1156
1157 module.exports = DatabasePostgres;