database migration 1.0.4, store topic fetch etag/last-modified, provide these when...
[websub-hub] / src / db / postgres / index.js
1 /* eslint-disable security/detect-object-injection */
2 'use strict';
3
4 const pgpInitOptions = {
5 capSQL: true,
6 };
7
8 const path = require('path');
9 const pgp = require('pg-promise')(pgpInitOptions);
10 const svh = require('../schema-version-helper');
11 const Database = require('../base');
12 const DBErrors = require('../errors');
13 const Listener = require('./listener');
14 const common = require('../../common');
15
16 const _fileScope = common.fileScope(__filename);
17
18 const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
19 const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
20 pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
21 const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
22 pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
23
24 const schemaVersionsSupported = {
25 min: {
26 major: 1,
27 minor: 0,
28 patch: 0,
29 },
30 max: {
31 major: 1,
32 minor: 0,
33 patch: 4,
34 },
35 };
36
37 class DatabasePostgres extends Database {
38 constructor(logger, options, _pgp = pgp) {
39 super(logger, options);
40
41 this.db = _pgp(options.db.connectionString);
42 this.schemaVersionsSupported = schemaVersionsSupported;
43
44 // Suppress QF warnings when running tests
45 this.noWarnings = options.db.noWarnings;
46
47 if (options.db.cacheEnabled) {
48 this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
49 channel: 'topic_changed',
50 dataCallback: this._topicChanged.bind(this),
51 connectionEstablishedCallback: this._listenerEstablished.bind(this),
52 connectionLostCallback: this._listenerLost.bind(this),
53 }));
54 }
55
56 // Log queries
57 const queryLogLevel = options.db.queryLogLevel;
58 if (queryLogLevel) {
59 pgpInitOptions.query = (event) => {
60 // Quell outgoing pings
61 if (event && event.query && event.query.startsWith('NOTIFY')) {
62 return;
63 }
64 this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
65 };
66 }
67
68 // Log errors
69 pgpInitOptions.error = (err, event) => {
70 this.logger.error(_fileScope('pgp:error'), '', { err, event });
71 };
72
73 // Deophidiate column names in-place, log results
74 pgpInitOptions.receive = (data, result, event) => {
75 const exemplaryRow = data[0];
76 for (const prop in exemplaryRow) {
77 const camel = Database._camelfy(prop);
78 if (!(camel in exemplaryRow)) {
79 for (const d of data) {
80 d[camel] = d[prop];
81 delete d[prop];
82 }
83 }
84 }
85 if (queryLogLevel) {
86 // Quell outgoing pings
87 if (result && result.command === 'NOTIFY') {
88 return;
89 }
90 // Omitting .rows
91 const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
92 this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
93 }
94 };
95
96 // Expose these for test coverage
97 this.pgpInitOptions = pgpInitOptions;
98 this._pgp = _pgp;
99
100 this._initStatements(_pgp);
101 }
102
103
104 _queryFileHelper(_pgp) {
105 return (file) => {
106 const _scope = _fileScope('_queryFile');
107 /* istanbul ignore next */
108 const qfParams = {
109 minify: true,
110 ...(this.noWarnings && { noWarnings: this.noWarnings }),
111 };
112 const qf = new _pgp.QueryFile(file, qfParams);
113 if (qf.error) {
114 this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
115 throw qf.error;
116 }
117 return qf;
118 };
119 }
120
121
122 async initialize(applyMigrations = true) {
123 const _scope = _fileScope('initialize');
124 this.logger.debug(_scope, 'called', { applyMigrations });
125 if (applyMigrations) {
126 await this._initTables();
127 }
128 await super.initialize();
129 if (this.listener) {
130 await this.listener.start();
131 }
132 }
133
134
135 async _initTables(_pgp) {
136 const _scope = _fileScope('_initTables');
137 this.logger.debug(_scope, 'called', {});
138
139 const _queryFile = this._queryFileHelper(_pgp || this._pgp);
140
141 // Migrations rely upon this table, ensure it exists.
142 const metaVersionTable = '_meta_schema_version';
143
144 const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
145 let metaExists = await tableExists(metaVersionTable);
146 if (!metaExists) {
147 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
148 const initSql = _queryFile(fPath);
149 const results = await this.db.multiResult(initSql);
150 this.logger.debug(_scope, 'executed init sql', { results });
151 metaExists = await tableExists(metaVersionTable);
152 /* istanbul ignore if */
153 if (!metaExists) {
154 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
155 }
156 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
157 }
158
159 // Apply migrations
160 const currentSchema = await this._currentSchema();
161 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
162 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
163 for (const v of migrationsWanted) {
164 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
165 try {
166 const migrationSql = _queryFile(fPath);
167 this.logger.debug(_scope, 'applying migration', { version: v });
168 const results = await this.db.multiResult(migrationSql);
169 this.logger.debug(_scope, 'migration results', { results });
170 this.logger.info(_scope, 'applied migration', { version: v });
171 } catch (e) {
172 this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
173 throw e;
174 }
175 }
176 }
177
178
179 _initStatements(_pgp) {
180 const _scope = _fileScope('_initStatements');
181 const _queryFile = this._queryFileHelper(_pgp);
182 this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
183 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
184 }
185
186
187 async healthCheck() {
188 const _scope = _fileScope('healthCheck');
189 this.logger.debug(_scope, 'called', {});
190 const c = await this.db.connect();
191 c.done();
192 return { serverVersion: c.client.serverVersion };
193 }
194
195
196 async _currentSchema() {
197 return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
198 }
199
200
201 async _closeConnection() {
202 const _scope = _fileScope('_closeConnection');
203 try {
204 if (this.listener) {
205 await this.listener.stop();
206 }
207 await this._pgp.end();
208 } catch (e) {
209 this.logger.error(_scope, 'failed', { error: e });
210 throw e;
211 }
212 }
213
214
215 /* istanbul ignore next */
216 async _purgeTables(really = false) {
217 const _scope = _fileScope('_purgeTables');
218 try {
219 if (really) {
220 await this.db.tx(async (t) => {
221 await t.batch([
222 'topic',
223 // 'topic_fetch_in_progress',
224 // 'verification',
225 // 'verification_in_progress',
226 // 'subscription',
227 // 'subscription_delivery_in_progress',
228 ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
229 });
230 }
231 } catch (e) {
232 this.logger.error(_scope, 'failed', { error: e });
233 throw e;
234 }
235 }
236
237
238 // eslint-disable-next-line class-methods-use-this
239 _engineInfo(result) {
240 return {
241 changes: result.rowCount,
242 lastInsertRowid: result.rows.length ? result.rows[0].id : undefined,
243 duration: result.duration,
244 };
245 }
246
247
248 // eslint-disable-next-line class-methods-use-this
249 _resultLog(result) {
250 return common.pick(result, ['command', 'rowCount', 'duration']);
251 }
252
253
254 /**
255 * Receive notices when topic entry is updated.
256 * Clear relevant cache entry.
257 * @param {String} payload
258 */
259 _topicChanged(payload) {
260 const _scope = _fileScope('_topicChanged');
261 if (payload !== 'ping') {
262 this.logger.debug(_scope, 'called', { payload });
263 this.cache.delete(payload);
264 }
265 }
266
267
268 /**
269 * Called when a listener connection is opened.
270 * Enable cache.
271 */
272 _listenerEstablished() {
273 const _scope = _fileScope('_listenerEstablished');
274 this.logger.debug(_scope, 'called', {});
275 this.cache = new Map();
276 }
277
278
279 /**
280 * Called when a listener connection is closed.
281 * Disable cache.
282 */
283 _listenerLost() {
284 const _scope = _fileScope('_listenerLost');
285 this.logger.debug(_scope, 'called', {});
286 delete this.cache;
287 }
288
289
290 /**
291 * Return a cached entry, if available.
292 * @param {*} key
293 */
294 _cacheGet(key) {
295 const _scope = _fileScope('_cacheGet');
296 if (this.cache && this.cache.has(key)) {
297 const cacheEntry = this.cache.get(key);
298 this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
299 cacheEntry.hits += 1;
300 cacheEntry.lastHit = new Date();
301 return cacheEntry.data;
302 }
303 }
304
305
306 /**
307 * Store an entry in cache, if available.
308 * @param {*} key
309 * @param {*} data
310 */
311 _cacheSet(key, data) {
312 const _scope = _fileScope('_cacheSet');
313 if (this.cache) {
314 this.cache.set(key, {
315 added: new Date(),
316 hits: 0,
317 lastHit: undefined,
318 data,
319 });
320 this.logger.debug(_scope, 'added cache entry', { key });
321 }
322 }
323
324
325 async context(fn) {
326 return this.db.task(async (t) => fn(t));
327 }
328
329
330 // eslint-disable-next-line class-methods-use-this
331 async transaction(dbCtx, fn) {
332 return dbCtx.txIf(async (t) => fn(t));
333 }
334
335
336 async authenticationSuccess(dbCtx, identifier) {
337 const _scope = _fileScope('authenticationSuccess');
338 this.logger.debug(_scope, 'called', { identifier });
339
340 let result;
341 try {
342 result = await dbCtx.result(this.statement.authenticationSuccess, { identifier });
343 if (result.rowCount != 1) {
344 throw new DBErrors.UnexpectedResult('did not update authentication success event');
345 }
346 } catch (e) {
347 this.logger.error(_scope, 'failed', { error: e, identifier });
348 throw e;
349 }
350 }
351
352
353 async authenticationGet(dbCtx, identifier) {
354 const _scope = _fileScope('authenticationGet');
355 this.logger.debug(_scope, 'called', { identifier });
356
357 let auth;
358 try {
359 auth = await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier });
360 return auth;
361 } catch (e) {
362 this.logger.error(_scope, 'failed', { error: e, identifier });
363 throw e;
364 }
365 }
366
367
368 async authenticationUpsert(dbCtx, identifier, credential) {
369 const _scope = _fileScope('authenticationUpsert');
370 const scrubbedCredential = '*'.repeat((credential || '').length);
371 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
372
373 let result;
374 try {
375 result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
376 if (result.rowCount != 1) {
377 throw new DBErrors.UnexpectedResult('did not upsert authentication');
378 }
379 } catch (e) {
380 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
381 throw e;
382 }
383 }
384
385
386 async subscriptionsByTopicId(dbCtx, topicId) {
387 const _scope = _fileScope('subscriptionsByTopicId');
388 this.logger.debug(_scope, 'called', { topicId });
389
390 let count;
391 try {
392 count = await dbCtx.manyOrNone(this.statement.subscriptionsByTopicId, { topicId });
393 return count;
394 } catch (e) {
395 this.logger.error(_scope, 'failed', { error: e, topicId });
396 throw e;
397 }
398 }
399
400
401 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
402 const _scope = _fileScope('subscriptionCountByTopicUrl');
403 this.logger.debug(_scope, 'called', { topicUrl });
404
405 let count;
406 try {
407 count = await dbCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl });
408 return count;
409 } catch (e) {
410 this.logger.error(_scope, 'failed', { error: e, topicUrl });
411 throw e;
412 }
413 }
414
415
416 async subscriptionDelete(dbCtx, callback, topicId) {
417 const _scope = _fileScope('subscriptionDelete');
418 this.logger.debug(_scope, 'called', { callback, topicId });
419
420 try {
421 const result = await dbCtx.result(this.statement.subscriptionDelete, { callback, topicId });
422 return this._engineInfo(result);
423 } catch (e) {
424 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
425 throw e;
426 }
427 }
428
429
430 async subscriptionDeleteExpired(dbCtx, topicId) {
431 const _scope = _fileScope('subscriptionDeleteExpired');
432 this.logger.debug(_scope, 'called', { topicId });
433
434 try {
435 const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
436 this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
437 return this._engineInfo(result);
438 } catch (e) {
439 this.logger.error(_scope, 'failed', { error: e, topicId });
440 throw e;
441 }
442 }
443
444
445 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
446 const _scope = _fileScope('subscriptionDeliveryClaim');
447 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
448
449 try {
450 const claims = await dbCtx.txIf(async (txCtx) => {
451 return txCtx.manyOrNone(this.statement.subscriptionDeliveryClaim, { claimant, wanted, claimTimeoutSeconds });
452 });
453 return claims.map((r) => r.id);
454 } catch (e) {
455 this.logger.error(_scope, 'failed', { error: e, claimant, wanted, claimTimeoutSeconds });
456 throw e;
457 }
458 }
459
460
461 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
462 const _scope = _fileScope('subscriptionDeliveryClaimById');
463 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
464
465 let result;
466 try {
467 result = await dbCtx.txIf(async (txCtx) => {
468 result = await txCtx.result(this.statement.subscriptionDeliveryClaimById, { claimant, subscriptionId, claimTimeoutSeconds });
469 if (result.rowCount != 1) {
470 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
471 }
472 return result;
473 });
474 return this._engineInfo(result);
475 } catch (e) {
476 this.logger.error(_scope, 'failed', { error: e, claimant, subscriptionId, claimTimeoutSeconds });
477 throw e;
478 }
479 }
480
481
482 async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
483 const _scope = _fileScope('subscriptionDeliveryComplete');
484 this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
485
486 let result;
487 try {
488 await dbCtx.txIf(async (txCtx) => {
489 result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
490 if (result.rowCount != 1) {
491 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
492 }
493 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
494 if (result.rowCount != 1) {
495 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
496 }
497 });
498 } catch (e) {
499 this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
500 throw e;
501 }
502 }
503
504
505 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
506 const _scope = _fileScope('subscriptionDeliveryGone');
507 this.logger.debug(_scope, 'called', { callback, topicId });
508
509 let result;
510 try {
511 await dbCtx.txIf(async (txCtx) => {
512 result = await txCtx.result(this.statement.subscriptionDelete, { callback, topicId });
513 if (result.rowCount != 1) {
514 throw new DBErrors.UnexpectedResult('did not delete subscription');
515 }
516 // Delete cascades to delivery
517 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
518 // if (result.rowCount != 1) {
519 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
520 // }
521 });
522 } catch (e) {
523 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
524 throw e;
525 }
526 }
527
528
529 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
530 const _scope = _fileScope('subscriptionDeliveryIncomplete');
531 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
532
533 let result;
534 try {
535 await dbCtx.txIf(async (txCtx) => {
536 const { currentAttempt } = await txCtx.one(this.statement.subscriptionDeliveryAttempts, { callback, topicId });
537 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
538 result = await txCtx.result(this.statement.subscriptionDeliveryFailure, { nextAttemptDelaySeconds, callback, topicId });
539 if (result.rowCount != 1) {
540 throw new DBErrors.UnexpectedResult('did not set subscription delivery failure');
541 }
542 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
543 if (result.rowCount != 1) {
544 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
545 }
546 });
547 } catch (e) {
548 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
549 throw e;
550 }
551 }
552
553
554 async subscriptionGet(dbCtx, callback, topicId) {
555 const _scope = _fileScope('subscriptionGet');
556 this.logger.debug(_scope, 'called', { callback, topicId });
557
558 let subscription;
559 try {
560 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGet, { callback, topicId });
561 return subscription;
562 } catch (e) {
563 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
564 throw e;
565 }
566 }
567
568
569 async subscriptionGetById(dbCtx, subscriptionId) {
570 const _scope = _fileScope('subscriptionGetById');
571 this.logger.debug(_scope, 'called', { subscriptionId });
572
573 let subscription;
574 try {
575 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGetById, { subscriptionId });
576 return subscription;
577 } catch (e) {
578 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
579 throw e;
580 }
581 }
582
583
584 async subscriptionUpdate(dbCtx, data) {
585 const _scope = _fileScope('subscriptionUpdate');
586 this.logger.debug(_scope, 'called', { data });
587
588 const subscriptionData = {
589 ...data,
590 };
591
592 this._subscriptionUpdateDataValidate(subscriptionData);
593
594 let result;
595 try {
596 result = await dbCtx.result(this.statement.subscriptionUpdate, subscriptionData);
597 if (result.rowCount != 1) {
598 throw new DBErrors.UnexpectedResult('did not update subscription');
599 }
600 } catch (e) {
601 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
602 throw e;
603 }
604 }
605
606
607 async subscriptionUpsert(dbCtx, data) {
608 const _scope = _fileScope('subscriptionUpsert');
609 this.logger.debug(_scope, 'called', { ...data });
610
611 const subscriptionData = {
612 secret: null,
613 httpRemoteAddr: null,
614 httpFrom: null,
615 ...data,
616 };
617 this._subscriptionUpsertDataValidate(subscriptionData);
618
619 let result;
620 try {
621 result = await dbCtx.result(this.statement.subscriptionUpsert, subscriptionData);
622 if (result.rowCount != 1) {
623 throw new DBErrors.UnexpectedResult('did not upsert subscription');
624 }
625 return this._engineInfo(result);
626 } catch (e) {
627 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
628 throw e;
629 }
630 }
631
632
633 async topicDeleted(dbCtx, topicId) {
634 const _scope = _fileScope('topicDeleted');
635 this.logger.debug(_scope, 'called', { topicId });
636
637 let result;
638 try {
639 result = await dbCtx.result(this.statement.topicDeleted, { topicId });
640 if (result.rowCount != 1) {
641 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
642 }
643 } catch (e) {
644 this.logger.error(_scope, 'failed to update topic as deleted', { error: e, topicId });
645 throw e;
646 }
647 }
648
649
650 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
651 const _scope = _fileScope('topicFetchClaim');
652 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
653
654 let claims;
655 try {
656 await dbCtx.txIf(async (txCtx) => {
657 claims = await txCtx.manyOrNone(this.statement.topicContentFetchClaim, { claimant, wanted, claimTimeoutSeconds });
658 });
659 return claims.map((r) => r.id);
660 } catch (e) {
661 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e });
662 throw e;
663 }
664 }
665
666
667 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
668 const _scope = _fileScope('topicFetchClaimById');
669 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
670
671 let result;
672 try {
673 await dbCtx.txIf(async (txCtx) => {
674 result = await txCtx.result(this.statement.topicContentFetchClaimById, { topicId, claimant, claimTimeoutSeconds });
675 });
676 return this._engineInfo(result);
677 } catch (e) {
678 this.logger.error(_scope, 'failed', { error: e, topicId });
679 throw e;
680 }
681 }
682
683
684 async topicFetchComplete(dbCtx, topicId) {
685 const _scope = _fileScope('topicFetchComplete');
686 this.logger.debug(_scope, 'called', { topicId });
687
688 let result;
689 try {
690 await dbCtx.txIf(async (txCtx) => {
691 result = await txCtx.result(this.statement.topicAttemptsReset, { topicId });
692 if (result.rowCount != 1) {
693 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
694 }
695 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
696 if (result.rowCount != 1) {
697 throw new DBErrors.UnexpectedResult('did not release topic fetch');
698 }
699 });
700 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
701 return this._engineInfo(result);
702 } catch (e) {
703 this.logger.error(_scope, 'failed', { error: e, result, topicId });
704 throw e;
705 }
706 }
707
708
709 async topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
710 const _scope = _fileScope('topicFetchIncomplete');
711 this.logger.debug(_scope, 'called', { topicId });
712
713 let result;
714 try {
715 result = await dbCtx.txIf(async (txCtx) => {
716 const { contentFetchAttemptsSinceSuccess: currentAttempt } = await txCtx.one(this.statement.topicAttempts, { topicId });
717 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
718 result = await txCtx.result(this.statement.topicAttemptsIncrement, { topicId, nextAttemptDelaySeconds });
719 if (result.rowCount != 1) {
720 throw new DBErrors.UnexpectedResult('did not set topic attempts');
721 }
722 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
723 if (result.rowCount != 1) {
724 throw new DBErrors.UnexpectedResult('did not release topic fetch');
725 }
726 return result;
727 });
728 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
729 return this._engineInfo(result);
730 } catch (e) {
731 this.logger.error(_scope, 'failed', { error: e, result, topicId });
732 throw e;
733 }
734 }
735
736
737 async topicFetchRequested(dbCtx, topicId) {
738 const _scope = _fileScope('topicFetchRequested');
739 this.logger.debug(_scope, 'called', { topicId });
740
741 let result;
742 try {
743 result = await dbCtx.result(this.statement.topicContentFetchRequested, { topicId });
744 if (result.rowCount != 1) {
745 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
746 }
747 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
748 return this._engineInfo(result);
749 } catch (e) {
750 this.logger.error(_scope, 'failed', { error: e, topicId });
751 throw e;
752 }
753 }
754
755
756 async topicGetAll(dbCtx) {
757 const _scope = _fileScope('topicGetAll');
758 this.logger.debug(_scope, 'called');
759
760 let topics;
761 try {
762 topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
763 } catch (e) {
764 this.logger.error(_scope, 'failed', { error: e, topics });
765 throw e;
766 }
767 if (topics) {
768 topics = topics.map(this._topicDefaults.bind(this));
769 }
770 return topics;
771 }
772
773
774 async topicGetById(dbCtx, topicId, applyDefaults = true) {
775 const _scope = _fileScope('topicGetById');
776 this.logger.debug(_scope, 'called', { topicId });
777
778 let topic;
779 try {
780 topic = await dbCtx.oneOrNone(this.statement.topicGetById, { topicId });
781 if (applyDefaults) {
782 topic = this._topicDefaults(topic);
783 }
784 return topic;
785 } catch (e) {
786 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
787 throw e;
788 }
789 }
790
791
792 async topicGetByUrl(dbCtx, topicUrl) {
793 const _scope = _fileScope('topicGetByUrl');
794 this.logger.debug(_scope, 'called', { topicUrl });
795
796 let topic;
797 try {
798 topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
799 return this._topicDefaults(topic);
800 } catch (e) {
801 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
802 throw e;
803 }
804 }
805
806
807 async topicGetContentById(dbCtx, topicId) {
808 const _scope = _fileScope('topicGetContentById');
809 this.logger.debug(_scope, 'called', { topicId });
810
811 let topic;
812 try {
813 topic = this._cacheGet(topicId);
814 if (topic) {
815 return topic;
816 }
817 topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
818 const topicWithDefaults = this._topicDefaults(topic);
819 this._cacheSet(topicId, topicWithDefaults);
820 return topicWithDefaults;
821 } catch (e) {
822 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
823 throw e;
824 }
825 }
826
827
828 async topicPendingDelete(dbCtx, topicId) {
829 const _scope = _fileScope('topicPendingDelete');
830 this.logger.debug(_scope, 'called', { topicId });
831
832 try {
833 await dbCtx.txIf(async (txCtx) => {
834 const topic = await txCtx.one(this.statement.topicGetById, { topicId });
835 if (!topic.isDeleted) {
836 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
837 return;
838 }
839
840 const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
841 if (subscriberCount) {
842 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
843 return;
844 }
845
846 const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
847 if (result.rowCount !== 1) {
848 throw new DBErrors.UnexpectedResult('did not delete topic');
849 }
850 });
851 this.logger.debug(_scope, 'success', { topicId });
852 } catch (e) {
853 this.logger.error(_scope, 'failed', { error: e, topicId });
854 throw e;
855 }
856 }
857
858
859 async topicPublishHistory(dbCtx, topicId, days) {
860 const _scope = _fileScope('topicPublishHistory');
861 this.logger.debug(_scope, 'called', { topicId, days });
862
863 const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days });
864 const history = Array.from({ length: days }, () => 0);
865 events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
866
867 return history;
868 }
869
870
871 async topicSet(dbCtx, data) {
872 const _scope = _fileScope('topicSet');
873 this.logger.debug(_scope, 'called', data);
874
875 const topicSetData = {
876 publisherValidationUrl: null,
877 leaseSecondsPreferred: null,
878 leaseSecondsMin: null,
879 leaseSecondsMax: null,
880 ...data,
881 };
882
883 let result;
884 try {
885 this._topicSetDataValidate(topicSetData);
886 result = await dbCtx.result(this.statement.topicUpsert, topicSetData);
887 if (result.rowCount != 1) {
888 throw new DBErrors.UnexpectedResult('did not set topic data');
889 }
890 this.logger.debug(_scope, 'success', { topicSetData, ...this._resultLog(result) });
891 return this._engineInfo(result);
892 } catch (e) {
893 this.logger.error(_scope, 'failed', { error: e, result });
894 throw e;
895 }
896 }
897
898
899 async topicSetContent(dbCtx, data) {
900 const _scope = _fileScope('topicSetContent');
901 const topicSetContentData = {
902 contentType: null,
903 httpETag: null,
904 httpLastModified: null,
905 ...data,
906 };
907 const logData = {
908 ...topicSetContentData,
909 content: common.logTruncate(topicSetContentData.content, 100),
910 };
911 this.logger.debug(_scope, 'called', data);
912
913 let result;
914 try {
915 this._topicSetContentDataValidate(topicSetContentData);
916 result = await dbCtx.result(this.statement.topicSetContent, topicSetContentData);
917 logData.result = this._resultLog(result);
918 if (result.rowCount != 1) {
919 throw new DBErrors.UnexpectedResult('did not set topic content');
920 }
921 result = await dbCtx.result(this.statement.topicSetContentHistory, {
922 topicId: data.topicId,
923 contentHash: data.contentHash,
924 contentSize: data.content.length,
925 });
926 if (result.rowCount != 1) {
927 throw new DBErrors.UnexpectedResult('did not set topic content history');
928 }
929 this.logger.debug(_scope, 'success', { ...logData });
930 return this._engineInfo(result);
931 } catch (e) {
932 this.logger.error(_scope, 'failed', { error: e, ...logData });
933 throw e;
934 }
935 }
936
937
938 async topicUpdate(dbCtx, data) {
939 const _scope = _fileScope('topicUpdate');
940 this.logger.debug(_scope, 'called', { data });
941
942 const topicData = {
943 leaseSecondsPreferred: null,
944 leaseSecondsMin: null,
945 leaseSecondsMax: null,
946 publisherValidationUrl: null,
947 ...data,
948 };
949
950 this._topicUpdateDataValidate(topicData);
951
952 let result;
953 try {
954 result = await dbCtx.result(this.statement.topicUpdate, topicData);
955 if (result.rowCount != 1) {
956 throw new DBErrors.UnexpectedResult('did not update topic');
957 }
958 } catch (e) {
959 this.logger.error(_scope, 'failed', { error: e, topicData });
960 throw e;
961 }
962 }
963
964
965 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
966 const _scope = _fileScope('verificationClaim');
967 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
968
969 let result;
970 try {
971 await dbCtx.txIf(async (txCtx) => {
972 result = await txCtx.manyOrNone(this.statement.verificationClaim, { claimant, wanted, claimTimeoutSeconds });
973 });
974 return result.map((r) => r.id);
975 } catch (e) {
976 this.logger.error(_scope, 'failed', { wanted, claimTimeoutSeconds });
977 throw e;
978 }
979 }
980
981
982
983 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
984 const _scope = _fileScope('verificationClaimById');
985 this.logger.debug(_scope, 'called', { verificationId, claimant, claimTimeoutSeconds });
986
987 let result;
988 try {
989 await dbCtx.txIf(async (txCtx) => {
990 result = await txCtx.result(this.statement.verificationClaimById, { verificationId, claimant, claimTimeoutSeconds });
991 });
992 return this._engineInfo(result);
993 } catch (e) {
994 this.logger.error(_scope, 'failed', { verificationId, claimant, claimTimeoutSeconds });
995 throw e;
996 }
997 }
998
999
1000 async verificationComplete(dbCtx, verificationId, callback, topicId) {
1001 const _scope = _fileScope('verificationComplete');
1002 this.logger.debug(_scope, 'called', { verificationId });
1003
1004 let result;
1005 try {
1006 await dbCtx.txIf(async (txCtx) => {
1007 result = await txCtx.result(this.statement.verificationScrub, { verificationId, callback, topicId });
1008 if (result.rowCount < 1) {
1009 throw new DBErrors.UnexpectedResult('did not remove verifications');
1010 }
1011 });
1012 } catch (e) {
1013 this.logger.error(_scope, 'failed', { verificationId });
1014 throw e;
1015 }
1016 return this._engineInfo(result);
1017 }
1018
1019
1020 async verificationGetById(dbCtx, verificationId) {
1021 const _scope = _fileScope('verificationGetById');
1022 this.logger.debug(_scope, 'called', { verificationId });
1023
1024 let verification;
1025 try {
1026 verification = await dbCtx.oneOrNone(this.statement.verificationGetById, { verificationId });
1027 return verification;
1028 } catch (e) {
1029 this.logger.error(_scope, 'failed', { error: e, verificationId });
1030 throw e;
1031 }
1032 }
1033
1034
1035 async verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
1036 const _scope = _fileScope('verificationIncomplete');
1037 this.logger.debug(_scope, 'called', { verificationId });
1038
1039 let result;
1040 try {
1041 await dbCtx.txIf(async (txCtx) => {
1042 const { attempts } = await txCtx.one(this.statement.verificationAttempts, { verificationId });
1043 const nextAttemptDelaySeconds = common.attemptRetrySeconds(attempts, retryDelays);
1044 result = await txCtx.result(this.statement.verificationAttemptIncrement, { verificationId, nextAttemptDelaySeconds });
1045 if (result.rowCount != 1) {
1046 throw new DBErrors.UnexpectedResult('did not update verification attempts');
1047 }
1048 result = await txCtx.result(this.statement.verificationDone, { verificationId });
1049 if (result.rowCount != 1) {
1050 throw new DBErrors.UnexpectedResult('did not release verification');
1051 }
1052 });
1053 } catch (e) {
1054 this.logger.error(_scope, 'failed', { error: e, verificationId });
1055 throw e;
1056 }
1057 }
1058
1059
1060 async verificationInsert(dbCtx, verification) {
1061 const _scope = _fileScope('verificationInsert');
1062 this.logger.debug(_scope, 'called', { verification });
1063
1064 const verificationData = {
1065 secret: null,
1066 httpRemoteAddr: null,
1067 httpFrom: null,
1068 requestId: null,
1069 ...verification,
1070 };
1071
1072 let result, verificationId;
1073 try {
1074 this._verificationDataValidate(verificationData);
1075 result = await dbCtx.result(this.statement.verificationInsert, verificationData);
1076 if (result.rowCount != 1) {
1077 throw new DBErrors.UnexpectedResult('did not insert verification');
1078 }
1079 verificationId = result.rows[0].id;
1080 this.logger.debug(_scope, 'inserted verification', { verificationId });
1081
1082 return verificationId;
1083 } catch (e) {
1084 this.logger.error(_scope, 'failed', { error: e, verificationData });
1085 throw e;
1086 }
1087 }
1088
1089
1090 async verificationRelease(dbCtx, verificationId) {
1091 const _scope = _fileScope('verificationRelease');
1092 this.logger.debug(_scope, 'called', { verificationId });
1093
1094 let result;
1095 try {
1096 result = await dbCtx.result(this.statement.verificationDone, { verificationId });
1097 if (result.rowCount != 1) {
1098 throw new DBErrors.UnexpectedResult('did not release verification');
1099 }
1100 return this._engineInfo(result);
1101 } catch (e) {
1102 this.logger.error(_scope, 'failed', { error: e, verificationId });
1103 throw e;
1104 }
1105 }
1106
1107
1108 async verificationUpdate(dbCtx, verificationId, data) {
1109 const _scope = _fileScope('verificationUpdate');
1110 this.logger.debug(_scope, 'called', { verificationId, data });
1111
1112 const verificationData = {
1113 reason: null,
1114 verificationId,
1115 ...data,
1116 };
1117
1118 let result;
1119 try {
1120 this._verificationUpdateDataValidate(verificationData);
1121 result = await dbCtx.result(this.statement.verificationUpdate, verificationData);
1122 if (result.rowCount != 1) {
1123 throw new DBErrors.UnexpectedResult('did not update verification');
1124 }
1125 } catch (e) {
1126 this.logger.error(_scope, 'failed', { error: e, verificationData });
1127 throw e;
1128 }
1129 }
1130
1131
1132 async verificationValidated(dbCtx, verificationId) {
1133 const _scope = _fileScope('verificationValidated');
1134 this.logger.debug(_scope, 'called', { verificationId });
1135
1136 let result;
1137 try {
1138 result = await dbCtx.result(this.statement.verificationValidate, { verificationId });
1139 if (result.rowCount != 1) {
1140 throw new DBErrors.UnexpectedResult('did not set verification validation');
1141 }
1142 } catch (e) {
1143 this.logger.error(_scope, 'failed', { error: e, verificationId });
1144 throw e;
1145 }
1146 }
1147
1148 }
1149
1150 module.exports = DatabasePostgres;