update dependencies, fixes to support new authentication features
[websub-hub] / src / db / postgres / index.js
1 /* eslint-disable security/detect-object-injection */
2 'use strict';
3
4 const pgpInitOptions = {
5 capSQL: true,
6 };
7
8 const path = require('path');
9 const pgp = require('pg-promise')(pgpInitOptions);
10 const svh = require('../schema-version-helper');
11 const Database = require('../base');
12 const DBErrors = require('../errors');
13 const Listener = require('./listener');
14 const common = require('../../common');
15
16 const _fileScope = common.fileScope(__filename);
17
18 const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
19 const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
20 pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
21 const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
22 pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
23
24 const schemaVersionsSupported = {
25 min: {
26 major: 1,
27 minor: 0,
28 patch: 0,
29 },
30 max: {
31 major: 1,
32 minor: 1,
33 patch: 0,
34 },
35 };
36
37 class DatabasePostgres extends Database {
38 constructor(logger, options, _pgp = pgp) {
39 super(logger, options);
40
41 this.db = _pgp(options.db.connectionString);
42 this.schemaVersionsSupported = schemaVersionsSupported;
43
44 // Suppress QF warnings when running tests
45 this.noWarnings = options.db.noWarnings;
46
47 if (options.db.cacheEnabled) {
48 this.listener = new Listener(logger, this.db, {
49 ...options.db.listener,
50 channel: 'topic_changed',
51 dataCallback: this._topicChanged.bind(this),
52 connectionEstablishedCallback: this._listenerEstablished.bind(this),
53 connectionLostCallback: this._listenerLost.bind(this),
54 });
55 }
56
57 // Log queries
58 const queryLogLevel = options.db.queryLogLevel;
59 if (queryLogLevel) {
60 pgpInitOptions.query = (event) => {
61 // Quell outgoing pings
62 if (event?.query?.startsWith('NOTIFY')) {
63 return;
64 }
65 this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event || {}, ['query', 'params']) });
66 };
67 }
68
69 // Log errors
70 pgpInitOptions.error = (err, event) => {
71 this.logger.error(_fileScope('pgp:error'), '', { err, event });
72
73 // TODO: close connection on err.code === '57P03' database shutting down
74 };
75
76 // Deophidiate column names in-place, log results
77 pgpInitOptions.receive = ({ data, result, ctx: event }) => {
78 const exemplaryRow = data[0];
79 for (const prop in exemplaryRow) {
80 const camel = Database._camelfy(prop);
81 if (!(camel in exemplaryRow)) {
82 for (const d of data) {
83 d[camel] = d[prop];
84 delete d[prop];
85 }
86 }
87 }
88 if (queryLogLevel) {
89 // Quell outgoing pings
90 if (result && result.command === 'NOTIFY') {
91 return;
92 }
93 // Omitting .rows
94 const resultLog = common.pick(result || {}, ['command', 'rowCount', 'duration']);
95 this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
96 }
97 };
98
99 // Expose these for test coverage
100 this.pgpInitOptions = pgpInitOptions;
101 this._pgp = _pgp;
102
103 this._initStatements(_pgp);
104 }
105
106
107 _queryFileHelper(_pgp) {
108 return (file) => {
109 const _scope = _fileScope('_queryFile');
110 /* istanbul ignore next */
111 const qfParams = {
112 minify: true,
113 ...(this.noWarnings && { noWarnings: this.noWarnings }),
114 };
115 const qf = new _pgp.QueryFile(file, qfParams);
116 if (qf.error) {
117 this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
118 throw qf.error;
119 }
120 return qf;
121 };
122 }
123
124
125 async initialize(applyMigrations = true) {
126 const _scope = _fileScope('initialize');
127 this.logger.debug(_scope, 'called', { applyMigrations });
128 if (applyMigrations) {
129 await this._initTables();
130 }
131 await super.initialize();
132 if (this.listener) {
133 await this.listener.start();
134 }
135 }
136
137
138 async _initTables(_pgp) {
139 const _scope = _fileScope('_initTables');
140 this.logger.debug(_scope, 'called', {});
141
142 const _queryFile = this._queryFileHelper(_pgp || this._pgp);
143
144 // Migrations rely upon this table, ensure it exists.
145 const metaVersionTable = '_meta_schema_version';
146
147 const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
148 let metaExists = await tableExists(metaVersionTable);
149 if (!metaExists) {
150 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
151 const initSql = _queryFile(fPath);
152 const results = await this.db.multiResult(initSql);
153 this.logger.debug(_scope, 'executed init sql', { results });
154 metaExists = await tableExists(metaVersionTable);
155 /* istanbul ignore if */
156 if (!metaExists) {
157 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
158 }
159 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
160 }
161
162 // Apply migrations
163 const currentSchema = await this._currentSchema();
164 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
165 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
166 for (const v of migrationsWanted) {
167 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
168 try {
169 const migrationSql = _queryFile(fPath);
170 this.logger.debug(_scope, 'applying migration', { version: v });
171 const results = await this.db.multiResult(migrationSql);
172 this.logger.debug(_scope, 'migration results', { results });
173 this.logger.info(_scope, 'applied migration', { version: v });
174 } catch (e) {
175 this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
176 throw e;
177 }
178 }
179 }
180
181
182 _initStatements(_pgp) {
183 const _scope = _fileScope('_initStatements');
184 const _queryFile = this._queryFileHelper(_pgp);
185 this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
186 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
187 }
188
189
190 async healthCheck() {
191 const _scope = _fileScope('healthCheck');
192 this.logger.debug(_scope, 'called', {});
193 const c = await this.db.connect();
194 c.done();
195 return { serverVersion: c.client.serverVersion };
196 }
197
198
199 async _currentSchema() {
200 return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
201 }
202
203
204 async _closeConnection() {
205 const _scope = _fileScope('_closeConnection');
206 try {
207 if (this.listener) {
208 await this.listener.stop();
209 }
210 await this._pgp.end();
211 } catch (e) {
212 this.logger.error(_scope, 'failed', { error: e });
213 throw e;
214 }
215 }
216
217
218 /* istanbul ignore next */
219 async _purgeTables(really = false) {
220 const _scope = _fileScope('_purgeTables');
221 try {
222 if (really) {
223 await this.db.tx(async (t) => {
224 await t.batch([
225 'topic',
226 // 'topic_fetch_in_progress',
227 // 'verification',
228 // 'verification_in_progress',
229 // 'subscription',
230 // 'subscription_delivery_in_progress',
231 ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
232 });
233 }
234 } catch (e) {
235 this.logger.error(_scope, 'failed', { error: e });
236 throw e;
237 }
238 }
239
240
241 // eslint-disable-next-line class-methods-use-this
242 _engineInfo(result) {
243 return {
244 changes: result.rowCount,
245 lastInsertRowid: result.rows.length ? result.rows[0].id : undefined,
246 duration: result.duration,
247 };
248 }
249
250
251 // eslint-disable-next-line class-methods-use-this
252 _resultLog(result) {
253 return common.pick(result, ['command', 'rowCount', 'duration']);
254 }
255
256
257 /**
258 * Receive notices when topic entry is updated.
259 * Clear relevant cache entry.
260 * @param {string} payload topic changed event
261 */
262 _topicChanged(payload) {
263 const _scope = _fileScope('_topicChanged');
264 if (payload !== 'ping') {
265 this.logger.debug(_scope, 'called', { payload });
266 this.cache.delete(payload);
267 }
268 }
269
270
271 /**
272 * Called when a listener connection is opened.
273 * Enable cache.
274 */
275 _listenerEstablished() {
276 const _scope = _fileScope('_listenerEstablished');
277 this.logger.debug(_scope, 'called', {});
278 this.cache = new Map();
279 }
280
281
282 /**
283 * Called when a listener connection is closed.
284 * Disable cache.
285 */
286 _listenerLost() {
287 const _scope = _fileScope('_listenerLost');
288 this.logger.debug(_scope, 'called', {});
289 delete this.cache;
290 }
291
292
293 /**
294 * Return a cached entry, if available.
295 * @param {*} key key
296 * @returns {object=} cached data
297 */
298 _cacheGet(key) {
299 const _scope = _fileScope('_cacheGet');
300 if (this.cache?.has(key)) {
301 const cacheEntry = this.cache.get(key);
302 this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
303 cacheEntry.hits += 1;
304 cacheEntry.lastHit = new Date();
305 return cacheEntry.data;
306 }
307 }
308
309
310 /**
311 * Store an entry in cache, if available.
312 * @param {*} key key
313 * @param {*} data data
314 */
315 _cacheSet(key, data) {
316 const _scope = _fileScope('_cacheSet');
317 if (this.cache) {
318 this.cache.set(key, {
319 added: new Date(),
320 hits: 0,
321 lastHit: undefined,
322 data,
323 });
324 this.logger.debug(_scope, 'added cache entry', { key });
325 }
326 }
327
328
329 async context(fn) {
330 return this.db.task(async (t) => fn(t));
331 }
332
333
334 // eslint-disable-next-line class-methods-use-this
335 async transaction(dbCtx, fn) {
336 return dbCtx.txIf(async (t) => fn(t));
337 }
338
339
340 async authenticationSuccess(dbCtx, identifier) {
341 const _scope = _fileScope('authenticationSuccess');
342 this.logger.debug(_scope, 'called', { identifier });
343
344 let result;
345 try {
346 result = await dbCtx.result(this.statement.authenticationSuccess, { identifier });
347 if (result.rowCount != 1) {
348 throw new DBErrors.UnexpectedResult('did not update authentication success event');
349 }
350 } catch (e) {
351 this.logger.error(_scope, 'failed', { error: e, identifier });
352 throw e;
353 }
354 }
355
356
357 async authenticationGet(dbCtx, identifier) {
358 const _scope = _fileScope('authenticationGet');
359 this.logger.debug(_scope, 'called', { identifier });
360
361 let auth;
362 try {
363 auth = await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier });
364 return auth;
365 } catch (e) {
366 this.logger.error(_scope, 'failed', { error: e, identifier });
367 throw e;
368 }
369 }
370
371
372 async authenticationUpsert(dbCtx, identifier, credential, otpKey) {
373 const _scope = _fileScope('authenticationUpsert');
374 const scrubbedCredential = '*'.repeat((credential || '').length);
375 const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null;
376 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential, scrubbedOTPKey });
377
378 let result;
379 try {
380 result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential, otpKey });
381 if (result.rowCount != 1) {
382 throw new DBErrors.UnexpectedResult('did not upsert authentication');
383 }
384 } catch (e) {
385 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential, scrubbedOTPKey });
386 throw e;
387 }
388 }
389
390
391 async authenticationUpdateCredential(dbCtx, identifier, credential) {
392 const _scope = _fileScope('authenticationUpdateCredential');
393 const scrubbedCredential = '*'.repeat((credential || '').length);
394 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
395
396 let result;
397 try {
398 result = await dbCtx.result(this.statement.authenticationUpdateCredential, { identifier, credential });
399 if (result.rowCount != 1) {
400 throw new DBErrors.UnexpectedResult('did not update authentication credential');
401 }
402 } catch (e) {
403 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential });
404 throw e;
405 }
406 }
407
408
409 async authenticationUpdateOTPKey(dbCtx, identifier, otpKey) {
410 const _scope = _fileScope('authenticationUpdateOTPKey');
411 const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null;
412 this.logger.debug(_scope, 'called', { identifier, scrubbedOTPKey });
413
414 let result;
415 try {
416 result = await dbCtx.result(this.statement.authenticationUpdateOtpKey, { identifier, otpKey });
417 if (result.rowCount != 1) {
418 throw new DBErrors.UnexpectedResult('did not update authentication otp key');
419 }
420 } catch (e) {
421 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedOTPKey });
422 throw e;
423 }
424 }
425
426
427 async subscriptionsByTopicId(dbCtx, topicId) {
428 const _scope = _fileScope('subscriptionsByTopicId');
429 this.logger.debug(_scope, 'called', { topicId });
430
431 let count;
432 try {
433 count = await dbCtx.manyOrNone(this.statement.subscriptionsByTopicId, { topicId });
434 return count;
435 } catch (e) {
436 this.logger.error(_scope, 'failed', { error: e, topicId });
437 throw e;
438 }
439 }
440
441
442 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
443 const _scope = _fileScope('subscriptionCountByTopicUrl');
444 this.logger.debug(_scope, 'called', { topicUrl });
445
446 let count;
447 try {
448 count = await dbCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl });
449 return count;
450 } catch (e) {
451 this.logger.error(_scope, 'failed', { error: e, topicUrl });
452 throw e;
453 }
454 }
455
456
457 async subscriptionDelete(dbCtx, callback, topicId) {
458 const _scope = _fileScope('subscriptionDelete');
459 this.logger.debug(_scope, 'called', { callback, topicId });
460
461 try {
462 const result = await dbCtx.result(this.statement.subscriptionDelete, { callback, topicId });
463 return this._engineInfo(result);
464 } catch (e) {
465 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
466 throw e;
467 }
468 }
469
470
471 async subscriptionDeleteExpired(dbCtx, topicId) {
472 const _scope = _fileScope('subscriptionDeleteExpired');
473 this.logger.debug(_scope, 'called', { topicId });
474
475 try {
476 const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
477 this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
478 return this._engineInfo(result);
479 } catch (e) {
480 this.logger.error(_scope, 'failed', { error: e, topicId });
481 throw e;
482 }
483 }
484
485
486 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
487 const _scope = _fileScope('subscriptionDeliveryClaim');
488 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
489
490 try {
491 const claims = await dbCtx.txIf(async (txCtx) => {
492 return txCtx.manyOrNone(this.statement.subscriptionDeliveryClaim, { claimant, wanted, claimTimeoutSeconds });
493 });
494 return claims.map((r) => r.id);
495 } catch (e) {
496 this.logger.error(_scope, 'failed', { error: e, claimant, wanted, claimTimeoutSeconds });
497 throw e;
498 }
499 }
500
501
502 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
503 const _scope = _fileScope('subscriptionDeliveryClaimById');
504 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
505
506 let result;
507 try {
508 result = await dbCtx.txIf(async (txCtx) => {
509 result = await txCtx.result(this.statement.subscriptionDeliveryClaimById, { claimant, subscriptionId, claimTimeoutSeconds });
510 if (result.rowCount != 1) {
511 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
512 }
513 return result;
514 });
515 return this._engineInfo(result);
516 } catch (e) {
517 this.logger.error(_scope, 'failed', { error: e, claimant, subscriptionId, claimTimeoutSeconds });
518 throw e;
519 }
520 }
521
522
523 async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
524 const _scope = _fileScope('subscriptionDeliveryComplete');
525 this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
526
527 let result;
528 try {
529 await dbCtx.txIf(async (txCtx) => {
530 result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
531 if (result.rowCount != 1) {
532 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
533 }
534 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
535 if (result.rowCount != 1) {
536 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
537 }
538 });
539 } catch (e) {
540 this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
541 throw e;
542 }
543 }
544
545
546 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
547 const _scope = _fileScope('subscriptionDeliveryGone');
548 this.logger.debug(_scope, 'called', { callback, topicId });
549
550 let result;
551 try {
552 await dbCtx.txIf(async (txCtx) => {
553 result = await txCtx.result(this.statement.subscriptionDelete, { callback, topicId });
554 if (result.rowCount != 1) {
555 throw new DBErrors.UnexpectedResult('did not delete subscription');
556 }
557 // Delete cascades to delivery
558 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
559 // if (result.rowCount != 1) {
560 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
561 // }
562 });
563 } catch (e) {
564 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
565 throw e;
566 }
567 }
568
569
570 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
571 const _scope = _fileScope('subscriptionDeliveryIncomplete');
572 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
573
574 let result;
575 try {
576 await dbCtx.txIf(async (txCtx) => {
577 const { currentAttempt } = await txCtx.one(this.statement.subscriptionDeliveryAttempts, { callback, topicId });
578 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
579 result = await txCtx.result(this.statement.subscriptionDeliveryFailure, { nextAttemptDelaySeconds, callback, topicId });
580 if (result.rowCount != 1) {
581 throw new DBErrors.UnexpectedResult('did not set subscription delivery failure');
582 }
583 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
584 if (result.rowCount != 1) {
585 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
586 }
587 });
588 } catch (e) {
589 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
590 throw e;
591 }
592 }
593
594
595 async subscriptionGet(dbCtx, callback, topicId) {
596 const _scope = _fileScope('subscriptionGet');
597 this.logger.debug(_scope, 'called', { callback, topicId });
598
599 let subscription;
600 try {
601 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGet, { callback, topicId });
602 return subscription;
603 } catch (e) {
604 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
605 throw e;
606 }
607 }
608
609
610 async subscriptionGetById(dbCtx, subscriptionId) {
611 const _scope = _fileScope('subscriptionGetById');
612 this.logger.debug(_scope, 'called', { subscriptionId });
613
614 let subscription;
615 try {
616 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGetById, { subscriptionId });
617 return subscription;
618 } catch (e) {
619 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
620 throw e;
621 }
622 }
623
624
625 async subscriptionUpdate(dbCtx, data) {
626 const _scope = _fileScope('subscriptionUpdate');
627 this.logger.debug(_scope, 'called', { data });
628
629 const subscriptionData = {
630 ...data,
631 };
632
633 this._subscriptionUpdateDataValidate(subscriptionData);
634
635 let result;
636 try {
637 result = await dbCtx.result(this.statement.subscriptionUpdate, subscriptionData);
638 if (result.rowCount != 1) {
639 throw new DBErrors.UnexpectedResult('did not update subscription');
640 }
641 } catch (e) {
642 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
643 throw e;
644 }
645 }
646
647
648 async subscriptionUpsert(dbCtx, data) {
649 const _scope = _fileScope('subscriptionUpsert');
650 this.logger.debug(_scope, 'called', { ...data });
651
652 const subscriptionData = {
653 secret: null,
654 httpRemoteAddr: null,
655 httpFrom: null,
656 ...data,
657 };
658 this._subscriptionUpsertDataValidate(subscriptionData);
659
660 let result;
661 try {
662 result = await dbCtx.result(this.statement.subscriptionUpsert, subscriptionData);
663 if (result.rowCount != 1) {
664 throw new DBErrors.UnexpectedResult('did not upsert subscription');
665 }
666 return this._engineInfo(result);
667 } catch (e) {
668 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
669 throw e;
670 }
671 }
672
673
674 async topicDeleted(dbCtx, topicId) {
675 const _scope = _fileScope('topicDeleted');
676 this.logger.debug(_scope, 'called', { topicId });
677
678 let result;
679 try {
680 result = await dbCtx.result(this.statement.topicDeleted, { topicId });
681 if (result.rowCount != 1) {
682 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
683 }
684 } catch (e) {
685 this.logger.error(_scope, 'failed to update topic as deleted', { error: e, topicId });
686 throw e;
687 }
688 }
689
690
691 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
692 const _scope = _fileScope('topicFetchClaim');
693 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
694
695 let claims;
696 try {
697 await dbCtx.txIf(async (txCtx) => {
698 claims = await txCtx.manyOrNone(this.statement.topicContentFetchClaim, { claimant, wanted, claimTimeoutSeconds });
699 });
700 return claims.map((r) => r.id);
701 } catch (e) {
702 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e });
703 throw e;
704 }
705 }
706
707
708 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
709 const _scope = _fileScope('topicFetchClaimById');
710 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
711
712 let result;
713 try {
714 await dbCtx.txIf(async (txCtx) => {
715 result = await txCtx.result(this.statement.topicContentFetchClaimById, { topicId, claimant, claimTimeoutSeconds });
716 });
717 return this._engineInfo(result);
718 } catch (e) {
719 this.logger.error(_scope, 'failed', { error: e, topicId });
720 throw e;
721 }
722 }
723
724
725 async topicFetchComplete(dbCtx, topicId) {
726 const _scope = _fileScope('topicFetchComplete');
727 this.logger.debug(_scope, 'called', { topicId });
728
729 let result;
730 try {
731 await dbCtx.txIf(async (txCtx) => {
732 result = await txCtx.result(this.statement.topicAttemptsReset, { topicId });
733 if (result.rowCount != 1) {
734 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
735 }
736 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
737 if (result.rowCount != 1) {
738 throw new DBErrors.UnexpectedResult('did not release topic fetch');
739 }
740 });
741 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
742 return this._engineInfo(result);
743 } catch (e) {
744 this.logger.error(_scope, 'failed', { error: e, result, topicId });
745 throw e;
746 }
747 }
748
749
750 async topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
751 const _scope = _fileScope('topicFetchIncomplete');
752 this.logger.debug(_scope, 'called', { topicId });
753
754 let result;
755 try {
756 result = await dbCtx.txIf(async (txCtx) => {
757 const { contentFetchAttemptsSinceSuccess: currentAttempt } = await txCtx.one(this.statement.topicAttempts, { topicId });
758 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
759 result = await txCtx.result(this.statement.topicAttemptsIncrement, { topicId, nextAttemptDelaySeconds });
760 if (result.rowCount != 1) {
761 throw new DBErrors.UnexpectedResult('did not set topic attempts');
762 }
763 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
764 if (result.rowCount != 1) {
765 throw new DBErrors.UnexpectedResult('did not release topic fetch');
766 }
767 return result;
768 });
769 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
770 return this._engineInfo(result);
771 } catch (e) {
772 this.logger.error(_scope, 'failed', { error: e, result, topicId });
773 throw e;
774 }
775 }
776
777
778 async topicFetchRequested(dbCtx, topicId) {
779 const _scope = _fileScope('topicFetchRequested');
780 this.logger.debug(_scope, 'called', { topicId });
781
782 let result;
783 try {
784 result = await dbCtx.result(this.statement.topicContentFetchRequested, { topicId });
785 if (result.rowCount != 1) {
786 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
787 }
788 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
789 return this._engineInfo(result);
790 } catch (e) {
791 this.logger.error(_scope, 'failed', { error: e, topicId });
792 throw e;
793 }
794 }
795
796
797 async topicGetAll(dbCtx) {
798 const _scope = _fileScope('topicGetAll');
799 this.logger.debug(_scope, 'called');
800
801 let topics;
802 try {
803 topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
804 } catch (e) {
805 this.logger.error(_scope, 'failed', { error: e, topics });
806 throw e;
807 }
808 if (topics) {
809 topics = topics.map(this._topicDefaults.bind(this));
810 }
811 return topics;
812 }
813
814
815 async topicGetById(dbCtx, topicId, applyDefaults = true) {
816 const _scope = _fileScope('topicGetById');
817 this.logger.debug(_scope, 'called', { topicId });
818
819 let topic;
820 try {
821 topic = await dbCtx.oneOrNone(this.statement.topicGetById, { topicId });
822 if (applyDefaults) {
823 topic = this._topicDefaults(topic);
824 }
825 return topic;
826 } catch (e) {
827 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
828 throw e;
829 }
830 }
831
832
833 async topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
834 const _scope = _fileScope('topicGetByUrl');
835 this.logger.debug(_scope, 'called', { topicUrl });
836
837 let topic;
838 try {
839 topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
840 if (applyDefaults) {
841 topic = this._topicDefaults(topic);
842 }
843 return topic;
844 } catch (e) {
845 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
846 throw e;
847 }
848 }
849
850
851 async topicGetContentById(dbCtx, topicId) {
852 const _scope = _fileScope('topicGetContentById');
853 this.logger.debug(_scope, 'called', { topicId });
854
855 let topic;
856 try {
857 topic = this._cacheGet(topicId);
858 if (topic) {
859 return topic;
860 }
861 topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
862 const topicWithDefaults = this._topicDefaults(topic);
863 this._cacheSet(topicId, topicWithDefaults);
864 return topicWithDefaults;
865 } catch (e) {
866 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
867 throw e;
868 }
869 }
870
871
872 async topicPendingDelete(dbCtx, topicId) {
873 const _scope = _fileScope('topicPendingDelete');
874 this.logger.debug(_scope, 'called', { topicId });
875
876 try {
877 await dbCtx.txIf(async (txCtx) => {
878 const topic = await txCtx.one(this.statement.topicGetById, { topicId });
879 if (!topic.isDeleted) {
880 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
881 return;
882 }
883
884 const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
885 if (subscriberCount) {
886 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
887 return;
888 }
889
890 const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
891 if (result.rowCount !== 1) {
892 throw new DBErrors.UnexpectedResult('did not delete topic');
893 }
894 });
895 this.logger.debug(_scope, 'success', { topicId });
896 } catch (e) {
897 this.logger.error(_scope, 'failed', { error: e, topicId });
898 throw e;
899 }
900 }
901
902
903 async topicPublishHistory(dbCtx, topicId, days) {
904 const _scope = _fileScope('topicPublishHistory');
905 this.logger.debug(_scope, 'called', { topicId, days });
906
907 const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days });
908 const history = Array.from({ length: days }, () => 0);
909 events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
910
911 return history;
912 }
913
914
915 async topicSet(dbCtx, data) {
916 const _scope = _fileScope('topicSet');
917 this.logger.debug(_scope, 'called', data);
918
919 const topicSetData = {
920 publisherValidationUrl: null,
921 leaseSecondsPreferred: null,
922 leaseSecondsMin: null,
923 leaseSecondsMax: null,
924 ...data,
925 };
926
927 let result;
928 try {
929 this._topicSetDataValidate(topicSetData);
930 result = await dbCtx.result(this.statement.topicUpsert, topicSetData);
931 if (result.rowCount != 1) {
932 throw new DBErrors.UnexpectedResult('did not set topic data');
933 }
934 this.logger.debug(_scope, 'success', { topicSetData, ...this._resultLog(result) });
935 return this._engineInfo(result);
936 } catch (e) {
937 this.logger.error(_scope, 'failed', { error: e, result });
938 throw e;
939 }
940 }
941
942
943 async topicSetContent(dbCtx, data) {
944 const _scope = _fileScope('topicSetContent');
945 const topicSetContentData = {
946 contentType: null,
947 httpETag: null,
948 httpLastModified: null,
949 ...data,
950 };
951 const logData = {
952 ...topicSetContentData,
953 content: common.logTruncate(topicSetContentData.content, 100),
954 };
955 this.logger.debug(_scope, 'called', data);
956
957 let result;
958 try {
959 this._topicSetContentDataValidate(topicSetContentData);
960 result = await dbCtx.result(this.statement.topicSetContent, topicSetContentData);
961 logData.result = this._resultLog(result);
962 if (result.rowCount != 1) {
963 throw new DBErrors.UnexpectedResult('did not set topic content');
964 }
965 result = await dbCtx.result(this.statement.topicSetContentHistory, {
966 topicId: data.topicId,
967 contentHash: data.contentHash,
968 contentSize: data.content.length,
969 });
970 if (result.rowCount != 1) {
971 throw new DBErrors.UnexpectedResult('did not set topic content history');
972 }
973 this.logger.debug(_scope, 'success', { ...logData });
974 return this._engineInfo(result);
975 } catch (e) {
976 this.logger.error(_scope, 'failed', { error: e, ...logData });
977 throw e;
978 }
979 }
980
981
982 async topicUpdate(dbCtx, data) {
983 const _scope = _fileScope('topicUpdate');
984 this.logger.debug(_scope, 'called', { data });
985
986 const topicData = {
987 leaseSecondsPreferred: null,
988 leaseSecondsMin: null,
989 leaseSecondsMax: null,
990 publisherValidationUrl: null,
991 ...data,
992 };
993
994 this._topicUpdateDataValidate(topicData);
995
996 let result;
997 try {
998 result = await dbCtx.result(this.statement.topicUpdate, topicData);
999 if (result.rowCount != 1) {
1000 throw new DBErrors.UnexpectedResult('did not update topic');
1001 }
1002 } catch (e) {
1003 this.logger.error(_scope, 'failed', { error: e, topicData });
1004 throw e;
1005 }
1006 }
1007
1008
1009 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
1010 const _scope = _fileScope('verificationClaim');
1011 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
1012
1013 let result;
1014 try {
1015 await dbCtx.txIf(async (txCtx) => {
1016 result = await txCtx.manyOrNone(this.statement.verificationClaim, { claimant, wanted, claimTimeoutSeconds });
1017 });
1018 return result.map((r) => r.id);
1019 } catch (e) {
1020 this.logger.error(_scope, 'failed', { wanted, claimTimeoutSeconds });
1021 throw e;
1022 }
1023 }
1024
1025
1026
1027 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
1028 const _scope = _fileScope('verificationClaimById');
1029 this.logger.debug(_scope, 'called', { verificationId, claimant, claimTimeoutSeconds });
1030
1031 let result;
1032 try {
1033 await dbCtx.txIf(async (txCtx) => {
1034 result = await txCtx.result(this.statement.verificationClaimById, { verificationId, claimant, claimTimeoutSeconds });
1035 });
1036 return this._engineInfo(result);
1037 } catch (e) {
1038 this.logger.error(_scope, 'failed', { verificationId, claimant, claimTimeoutSeconds });
1039 throw e;
1040 }
1041 }
1042
1043
1044 async verificationComplete(dbCtx, verificationId, callback, topicId) {
1045 const _scope = _fileScope('verificationComplete');
1046 this.logger.debug(_scope, 'called', { verificationId });
1047
1048 let result;
1049 try {
1050 await dbCtx.txIf(async (txCtx) => {
1051 result = await txCtx.result(this.statement.verificationScrub, { verificationId, callback, topicId });
1052 if (result.rowCount < 1) {
1053 throw new DBErrors.UnexpectedResult('did not remove verifications');
1054 }
1055 });
1056 } catch (e) {
1057 this.logger.error(_scope, 'failed', { verificationId });
1058 throw e;
1059 }
1060 return this._engineInfo(result);
1061 }
1062
1063
1064 async verificationGetById(dbCtx, verificationId) {
1065 const _scope = _fileScope('verificationGetById');
1066 this.logger.debug(_scope, 'called', { verificationId });
1067
1068 let verification;
1069 try {
1070 verification = await dbCtx.oneOrNone(this.statement.verificationGetById, { verificationId });
1071 return verification;
1072 } catch (e) {
1073 this.logger.error(_scope, 'failed', { error: e, verificationId });
1074 throw e;
1075 }
1076 }
1077
1078
1079 async verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
1080 const _scope = _fileScope('verificationIncomplete');
1081 this.logger.debug(_scope, 'called', { verificationId });
1082
1083 let result;
1084 try {
1085 await dbCtx.txIf(async (txCtx) => {
1086 const { attempts } = await txCtx.one(this.statement.verificationAttempts, { verificationId });
1087 const nextAttemptDelaySeconds = common.attemptRetrySeconds(attempts, retryDelays);
1088 result = await txCtx.result(this.statement.verificationAttemptIncrement, { verificationId, nextAttemptDelaySeconds });
1089 if (result.rowCount != 1) {
1090 throw new DBErrors.UnexpectedResult('did not update verification attempts');
1091 }
1092 result = await txCtx.result(this.statement.verificationDone, { verificationId });
1093 if (result.rowCount != 1) {
1094 throw new DBErrors.UnexpectedResult('did not release verification');
1095 }
1096 });
1097 } catch (e) {
1098 this.logger.error(_scope, 'failed', { error: e, verificationId });
1099 throw e;
1100 }
1101 }
1102
1103
1104 async verificationInsert(dbCtx, verification) {
1105 const _scope = _fileScope('verificationInsert');
1106 this.logger.debug(_scope, 'called', { verification });
1107
1108 const verificationData = {
1109 secret: null,
1110 httpRemoteAddr: null,
1111 httpFrom: null,
1112 requestId: null,
1113 ...verification,
1114 };
1115
1116 let result, verificationId;
1117 try {
1118 this._verificationDataValidate(verificationData);
1119 result = await dbCtx.result(this.statement.verificationInsert, verificationData);
1120 if (result.rowCount != 1) {
1121 throw new DBErrors.UnexpectedResult('did not insert verification');
1122 }
1123 verificationId = result.rows[0].id;
1124 this.logger.debug(_scope, 'inserted verification', { verificationId });
1125
1126 return verificationId;
1127 } catch (e) {
1128 this.logger.error(_scope, 'failed', { error: e, verificationData });
1129 throw e;
1130 }
1131 }
1132
1133
1134 async verificationRelease(dbCtx, verificationId) {
1135 const _scope = _fileScope('verificationRelease');
1136 this.logger.debug(_scope, 'called', { verificationId });
1137
1138 let result;
1139 try {
1140 result = await dbCtx.result(this.statement.verificationDone, { verificationId });
1141 if (result.rowCount != 1) {
1142 throw new DBErrors.UnexpectedResult('did not release verification');
1143 }
1144 return this._engineInfo(result);
1145 } catch (e) {
1146 this.logger.error(_scope, 'failed', { error: e, verificationId });
1147 throw e;
1148 }
1149 }
1150
1151
1152 async verificationUpdate(dbCtx, verificationId, data) {
1153 const _scope = _fileScope('verificationUpdate');
1154 this.logger.debug(_scope, 'called', { verificationId, data });
1155
1156 const verificationData = {
1157 reason: null,
1158 verificationId,
1159 ...data,
1160 };
1161
1162 let result;
1163 try {
1164 this._verificationUpdateDataValidate(verificationData);
1165 result = await dbCtx.result(this.statement.verificationUpdate, verificationData);
1166 if (result.rowCount != 1) {
1167 throw new DBErrors.UnexpectedResult('did not update verification');
1168 }
1169 } catch (e) {
1170 this.logger.error(_scope, 'failed', { error: e, verificationData });
1171 throw e;
1172 }
1173 }
1174
1175
1176 async verificationValidated(dbCtx, verificationId) {
1177 const _scope = _fileScope('verificationValidated');
1178 this.logger.debug(_scope, 'called', { verificationId });
1179
1180 let result;
1181 try {
1182 result = await dbCtx.result(this.statement.verificationValidate, { verificationId });
1183 if (result.rowCount != 1) {
1184 throw new DBErrors.UnexpectedResult('did not set verification validation');
1185 }
1186 } catch (e) {
1187 this.logger.error(_scope, 'failed', { error: e, verificationId });
1188 throw e;
1189 }
1190 }
1191
1192 }
1193
1194 module.exports = DatabasePostgres;