use got instead of axios, some cleanup, problem with async context being lost for...
[websub-hub] / src / db / postgres / index.js
1 /* eslint-disable security/detect-object-injection */
2 'use strict';
3
4 const pgpInitOptions = {
5 capSQL: true,
6 };
7
8 const path = require('path');
9 const pgp = require('pg-promise')(pgpInitOptions);
10 const svh = require('../schema-version-helper');
11 const Database = require('../base');
12 const DBErrors = require('../errors');
13 const Listener = require('./listener');
14 const common = require('../../common');
15
16 const _fileScope = common.fileScope(__filename);
17
18 const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
19 const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
20 pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
21 const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
22 pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
23
24 const schemaVersionsSupported = {
25 min: {
26 major: 1,
27 minor: 0,
28 patch: 0,
29 },
30 max: {
31 major: 1,
32 minor: 0,
33 patch: 4,
34 },
35 };
36
37 class DatabasePostgres extends Database {
38 constructor(logger, options, _pgp = pgp) {
39 super(logger, options);
40
41 this.db = _pgp(options.db.connectionString);
42 this.schemaVersionsSupported = schemaVersionsSupported;
43
44 // Suppress QF warnings when running tests
45 this.noWarnings = options.db.noWarnings;
46
47 if (options.db.cacheEnabled) {
48 this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
49 channel: 'topic_changed',
50 dataCallback: this._topicChanged.bind(this),
51 connectionEstablishedCallback: this._listenerEstablished.bind(this),
52 connectionLostCallback: this._listenerLost.bind(this),
53 }));
54 }
55
56 // Log queries
57 const queryLogLevel = options.db.queryLogLevel;
58 if (queryLogLevel) {
59 pgpInitOptions.query = (event) => {
60 // Quell outgoing pings
61 if (event && event.query && event.query.startsWith('NOTIFY')) {
62 return;
63 }
64 this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event || {}, ['query', 'params']) });
65 };
66 }
67
68 // Log errors
69 pgpInitOptions.error = (err, event) => {
70 this.logger.error(_fileScope('pgp:error'), '', { err, event });
71
72 // TODO: close connection on err.code === '57P03' database shutting down
73 };
74
75 // Deophidiate column names in-place, log results
76 pgpInitOptions.receive = ({ data, result, ctx: event }) => {
77 const exemplaryRow = data[0];
78 for (const prop in exemplaryRow) {
79 const camel = Database._camelfy(prop);
80 if (!(camel in exemplaryRow)) {
81 for (const d of data) {
82 d[camel] = d[prop];
83 delete d[prop];
84 }
85 }
86 }
87 if (queryLogLevel) {
88 // Quell outgoing pings
89 if (result && result.command === 'NOTIFY') {
90 return;
91 }
92 // Omitting .rows
93 const resultLog = common.pick(result || {}, ['command', 'rowCount', 'duration']);
94 this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
95 }
96 };
97
98 // Expose these for test coverage
99 this.pgpInitOptions = pgpInitOptions;
100 this._pgp = _pgp;
101
102 this._initStatements(_pgp);
103 }
104
105
106 _queryFileHelper(_pgp) {
107 return (file) => {
108 const _scope = _fileScope('_queryFile');
109 /* istanbul ignore next */
110 const qfParams = {
111 minify: true,
112 ...(this.noWarnings && { noWarnings: this.noWarnings }),
113 };
114 const qf = new _pgp.QueryFile(file, qfParams);
115 if (qf.error) {
116 this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
117 throw qf.error;
118 }
119 return qf;
120 };
121 }
122
123
124 async initialize(applyMigrations = true) {
125 const _scope = _fileScope('initialize');
126 this.logger.debug(_scope, 'called', { applyMigrations });
127 if (applyMigrations) {
128 await this._initTables();
129 }
130 await super.initialize();
131 if (this.listener) {
132 await this.listener.start();
133 }
134 }
135
136
137 async _initTables(_pgp) {
138 const _scope = _fileScope('_initTables');
139 this.logger.debug(_scope, 'called', {});
140
141 const _queryFile = this._queryFileHelper(_pgp || this._pgp);
142
143 // Migrations rely upon this table, ensure it exists.
144 const metaVersionTable = '_meta_schema_version';
145
146 const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
147 let metaExists = await tableExists(metaVersionTable);
148 if (!metaExists) {
149 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
150 const initSql = _queryFile(fPath);
151 const results = await this.db.multiResult(initSql);
152 this.logger.debug(_scope, 'executed init sql', { results });
153 metaExists = await tableExists(metaVersionTable);
154 /* istanbul ignore if */
155 if (!metaExists) {
156 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
157 }
158 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
159 }
160
161 // Apply migrations
162 const currentSchema = await this._currentSchema();
163 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
164 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
165 for (const v of migrationsWanted) {
166 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
167 try {
168 const migrationSql = _queryFile(fPath);
169 this.logger.debug(_scope, 'applying migration', { version: v });
170 const results = await this.db.multiResult(migrationSql);
171 this.logger.debug(_scope, 'migration results', { results });
172 this.logger.info(_scope, 'applied migration', { version: v });
173 } catch (e) {
174 this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
175 throw e;
176 }
177 }
178 }
179
180
181 _initStatements(_pgp) {
182 const _scope = _fileScope('_initStatements');
183 const _queryFile = this._queryFileHelper(_pgp);
184 this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
185 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
186 }
187
188
189 async healthCheck() {
190 const _scope = _fileScope('healthCheck');
191 this.logger.debug(_scope, 'called', {});
192 const c = await this.db.connect();
193 c.done();
194 return { serverVersion: c.client.serverVersion };
195 }
196
197
198 async _currentSchema() {
199 return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
200 }
201
202
203 async _closeConnection() {
204 const _scope = _fileScope('_closeConnection');
205 try {
206 if (this.listener) {
207 await this.listener.stop();
208 }
209 await this._pgp.end();
210 } catch (e) {
211 this.logger.error(_scope, 'failed', { error: e });
212 throw e;
213 }
214 }
215
216
217 /* istanbul ignore next */
218 async _purgeTables(really = false) {
219 const _scope = _fileScope('_purgeTables');
220 try {
221 if (really) {
222 await this.db.tx(async (t) => {
223 await t.batch([
224 'topic',
225 // 'topic_fetch_in_progress',
226 // 'verification',
227 // 'verification_in_progress',
228 // 'subscription',
229 // 'subscription_delivery_in_progress',
230 ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
231 });
232 }
233 } catch (e) {
234 this.logger.error(_scope, 'failed', { error: e });
235 throw e;
236 }
237 }
238
239
240 // eslint-disable-next-line class-methods-use-this
241 _engineInfo(result) {
242 return {
243 changes: result.rowCount,
244 lastInsertRowid: result.rows.length ? result.rows[0].id : undefined,
245 duration: result.duration,
246 };
247 }
248
249
250 // eslint-disable-next-line class-methods-use-this
251 _resultLog(result) {
252 return common.pick(result, ['command', 'rowCount', 'duration']);
253 }
254
255
256 /**
257 * Receive notices when topic entry is updated.
258 * Clear relevant cache entry.
259 * @param {String} payload
260 */
261 _topicChanged(payload) {
262 const _scope = _fileScope('_topicChanged');
263 if (payload !== 'ping') {
264 this.logger.debug(_scope, 'called', { payload });
265 this.cache.delete(payload);
266 }
267 }
268
269
270 /**
271 * Called when a listener connection is opened.
272 * Enable cache.
273 */
274 _listenerEstablished() {
275 const _scope = _fileScope('_listenerEstablished');
276 this.logger.debug(_scope, 'called', {});
277 this.cache = new Map();
278 }
279
280
281 /**
282 * Called when a listener connection is closed.
283 * Disable cache.
284 */
285 _listenerLost() {
286 const _scope = _fileScope('_listenerLost');
287 this.logger.debug(_scope, 'called', {});
288 delete this.cache;
289 }
290
291
292 /**
293 * Return a cached entry, if available.
294 * @param {*} key
295 */
296 _cacheGet(key) {
297 const _scope = _fileScope('_cacheGet');
298 if (this.cache && this.cache.has(key)) {
299 const cacheEntry = this.cache.get(key);
300 this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
301 cacheEntry.hits += 1;
302 cacheEntry.lastHit = new Date();
303 return cacheEntry.data;
304 }
305 }
306
307
308 /**
309 * Store an entry in cache, if available.
310 * @param {*} key
311 * @param {*} data
312 */
313 _cacheSet(key, data) {
314 const _scope = _fileScope('_cacheSet');
315 if (this.cache) {
316 this.cache.set(key, {
317 added: new Date(),
318 hits: 0,
319 lastHit: undefined,
320 data,
321 });
322 this.logger.debug(_scope, 'added cache entry', { key });
323 }
324 }
325
326
327 async context(fn) {
328 return this.db.task(async (t) => fn(t));
329 }
330
331
332 // eslint-disable-next-line class-methods-use-this
333 async transaction(dbCtx, fn) {
334 return dbCtx.txIf(async (t) => fn(t));
335 }
336
337
338 async authenticationSuccess(dbCtx, identifier) {
339 const _scope = _fileScope('authenticationSuccess');
340 this.logger.debug(_scope, 'called', { identifier });
341
342 let result;
343 try {
344 result = await dbCtx.result(this.statement.authenticationSuccess, { identifier });
345 if (result.rowCount != 1) {
346 throw new DBErrors.UnexpectedResult('did not update authentication success event');
347 }
348 } catch (e) {
349 this.logger.error(_scope, 'failed', { error: e, identifier });
350 throw e;
351 }
352 }
353
354
355 async authenticationGet(dbCtx, identifier) {
356 const _scope = _fileScope('authenticationGet');
357 this.logger.debug(_scope, 'called', { identifier });
358
359 let auth;
360 try {
361 auth = await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier });
362 return auth;
363 } catch (e) {
364 this.logger.error(_scope, 'failed', { error: e, identifier });
365 throw e;
366 }
367 }
368
369
370 async authenticationUpsert(dbCtx, identifier, credential) {
371 const _scope = _fileScope('authenticationUpsert');
372 const scrubbedCredential = '*'.repeat((credential || '').length);
373 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
374
375 let result;
376 try {
377 result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
378 if (result.rowCount != 1) {
379 throw new DBErrors.UnexpectedResult('did not upsert authentication');
380 }
381 } catch (e) {
382 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential });
383 throw e;
384 }
385 }
386
387
388 async subscriptionsByTopicId(dbCtx, topicId) {
389 const _scope = _fileScope('subscriptionsByTopicId');
390 this.logger.debug(_scope, 'called', { topicId });
391
392 let count;
393 try {
394 count = await dbCtx.manyOrNone(this.statement.subscriptionsByTopicId, { topicId });
395 return count;
396 } catch (e) {
397 this.logger.error(_scope, 'failed', { error: e, topicId });
398 throw e;
399 }
400 }
401
402
403 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
404 const _scope = _fileScope('subscriptionCountByTopicUrl');
405 this.logger.debug(_scope, 'called', { topicUrl });
406
407 let count;
408 try {
409 count = await dbCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl });
410 return count;
411 } catch (e) {
412 this.logger.error(_scope, 'failed', { error: e, topicUrl });
413 throw e;
414 }
415 }
416
417
418 async subscriptionDelete(dbCtx, callback, topicId) {
419 const _scope = _fileScope('subscriptionDelete');
420 this.logger.debug(_scope, 'called', { callback, topicId });
421
422 try {
423 const result = await dbCtx.result(this.statement.subscriptionDelete, { callback, topicId });
424 return this._engineInfo(result);
425 } catch (e) {
426 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
427 throw e;
428 }
429 }
430
431
432 async subscriptionDeleteExpired(dbCtx, topicId) {
433 const _scope = _fileScope('subscriptionDeleteExpired');
434 this.logger.debug(_scope, 'called', { topicId });
435
436 try {
437 const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
438 this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
439 return this._engineInfo(result);
440 } catch (e) {
441 this.logger.error(_scope, 'failed', { error: e, topicId });
442 throw e;
443 }
444 }
445
446
447 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
448 const _scope = _fileScope('subscriptionDeliveryClaim');
449 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
450
451 try {
452 const claims = await dbCtx.txIf(async (txCtx) => {
453 return txCtx.manyOrNone(this.statement.subscriptionDeliveryClaim, { claimant, wanted, claimTimeoutSeconds });
454 });
455 return claims.map((r) => r.id);
456 } catch (e) {
457 this.logger.error(_scope, 'failed', { error: e, claimant, wanted, claimTimeoutSeconds });
458 throw e;
459 }
460 }
461
462
463 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
464 const _scope = _fileScope('subscriptionDeliveryClaimById');
465 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
466
467 let result;
468 try {
469 result = await dbCtx.txIf(async (txCtx) => {
470 result = await txCtx.result(this.statement.subscriptionDeliveryClaimById, { claimant, subscriptionId, claimTimeoutSeconds });
471 if (result.rowCount != 1) {
472 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
473 }
474 return result;
475 });
476 return this._engineInfo(result);
477 } catch (e) {
478 this.logger.error(_scope, 'failed', { error: e, claimant, subscriptionId, claimTimeoutSeconds });
479 throw e;
480 }
481 }
482
483
484 async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
485 const _scope = _fileScope('subscriptionDeliveryComplete');
486 this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
487
488 let result;
489 try {
490 await dbCtx.txIf(async (txCtx) => {
491 result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
492 if (result.rowCount != 1) {
493 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
494 }
495 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
496 if (result.rowCount != 1) {
497 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
498 }
499 });
500 } catch (e) {
501 this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
502 throw e;
503 }
504 }
505
506
507 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
508 const _scope = _fileScope('subscriptionDeliveryGone');
509 this.logger.debug(_scope, 'called', { callback, topicId });
510
511 let result;
512 try {
513 await dbCtx.txIf(async (txCtx) => {
514 result = await txCtx.result(this.statement.subscriptionDelete, { callback, topicId });
515 if (result.rowCount != 1) {
516 throw new DBErrors.UnexpectedResult('did not delete subscription');
517 }
518 // Delete cascades to delivery
519 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
520 // if (result.rowCount != 1) {
521 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
522 // }
523 });
524 } catch (e) {
525 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
526 throw e;
527 }
528 }
529
530
531 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
532 const _scope = _fileScope('subscriptionDeliveryIncomplete');
533 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
534
535 let result;
536 try {
537 await dbCtx.txIf(async (txCtx) => {
538 const { currentAttempt } = await txCtx.one(this.statement.subscriptionDeliveryAttempts, { callback, topicId });
539 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
540 result = await txCtx.result(this.statement.subscriptionDeliveryFailure, { nextAttemptDelaySeconds, callback, topicId });
541 if (result.rowCount != 1) {
542 throw new DBErrors.UnexpectedResult('did not set subscription delivery failure');
543 }
544 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
545 if (result.rowCount != 1) {
546 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
547 }
548 });
549 } catch (e) {
550 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
551 throw e;
552 }
553 }
554
555
556 async subscriptionGet(dbCtx, callback, topicId) {
557 const _scope = _fileScope('subscriptionGet');
558 this.logger.debug(_scope, 'called', { callback, topicId });
559
560 let subscription;
561 try {
562 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGet, { callback, topicId });
563 return subscription;
564 } catch (e) {
565 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
566 throw e;
567 }
568 }
569
570
571 async subscriptionGetById(dbCtx, subscriptionId) {
572 const _scope = _fileScope('subscriptionGetById');
573 this.logger.debug(_scope, 'called', { subscriptionId });
574
575 let subscription;
576 try {
577 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGetById, { subscriptionId });
578 return subscription;
579 } catch (e) {
580 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
581 throw e;
582 }
583 }
584
585
586 async subscriptionUpdate(dbCtx, data) {
587 const _scope = _fileScope('subscriptionUpdate');
588 this.logger.debug(_scope, 'called', { data });
589
590 const subscriptionData = {
591 ...data,
592 };
593
594 this._subscriptionUpdateDataValidate(subscriptionData);
595
596 let result;
597 try {
598 result = await dbCtx.result(this.statement.subscriptionUpdate, subscriptionData);
599 if (result.rowCount != 1) {
600 throw new DBErrors.UnexpectedResult('did not update subscription');
601 }
602 } catch (e) {
603 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
604 throw e;
605 }
606 }
607
608
609 async subscriptionUpsert(dbCtx, data) {
610 const _scope = _fileScope('subscriptionUpsert');
611 this.logger.debug(_scope, 'called', { ...data });
612
613 const subscriptionData = {
614 secret: null,
615 httpRemoteAddr: null,
616 httpFrom: null,
617 ...data,
618 };
619 this._subscriptionUpsertDataValidate(subscriptionData);
620
621 let result;
622 try {
623 result = await dbCtx.result(this.statement.subscriptionUpsert, subscriptionData);
624 if (result.rowCount != 1) {
625 throw new DBErrors.UnexpectedResult('did not upsert subscription');
626 }
627 return this._engineInfo(result);
628 } catch (e) {
629 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
630 throw e;
631 }
632 }
633
634
635 async topicDeleted(dbCtx, topicId) {
636 const _scope = _fileScope('topicDeleted');
637 this.logger.debug(_scope, 'called', { topicId });
638
639 let result;
640 try {
641 result = await dbCtx.result(this.statement.topicDeleted, { topicId });
642 if (result.rowCount != 1) {
643 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
644 }
645 } catch (e) {
646 this.logger.error(_scope, 'failed to update topic as deleted', { error: e, topicId });
647 throw e;
648 }
649 }
650
651
652 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
653 const _scope = _fileScope('topicFetchClaim');
654 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
655
656 let claims;
657 try {
658 await dbCtx.txIf(async (txCtx) => {
659 claims = await txCtx.manyOrNone(this.statement.topicContentFetchClaim, { claimant, wanted, claimTimeoutSeconds });
660 });
661 return claims.map((r) => r.id);
662 } catch (e) {
663 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e });
664 throw e;
665 }
666 }
667
668
669 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
670 const _scope = _fileScope('topicFetchClaimById');
671 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
672
673 let result;
674 try {
675 await dbCtx.txIf(async (txCtx) => {
676 result = await txCtx.result(this.statement.topicContentFetchClaimById, { topicId, claimant, claimTimeoutSeconds });
677 });
678 return this._engineInfo(result);
679 } catch (e) {
680 this.logger.error(_scope, 'failed', { error: e, topicId });
681 throw e;
682 }
683 }
684
685
686 async topicFetchComplete(dbCtx, topicId) {
687 const _scope = _fileScope('topicFetchComplete');
688 this.logger.debug(_scope, 'called', { topicId });
689
690 let result;
691 try {
692 await dbCtx.txIf(async (txCtx) => {
693 result = await txCtx.result(this.statement.topicAttemptsReset, { topicId });
694 if (result.rowCount != 1) {
695 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
696 }
697 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
698 if (result.rowCount != 1) {
699 throw new DBErrors.UnexpectedResult('did not release topic fetch');
700 }
701 });
702 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
703 return this._engineInfo(result);
704 } catch (e) {
705 this.logger.error(_scope, 'failed', { error: e, result, topicId });
706 throw e;
707 }
708 }
709
710
711 async topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
712 const _scope = _fileScope('topicFetchIncomplete');
713 this.logger.debug(_scope, 'called', { topicId });
714
715 let result;
716 try {
717 result = await dbCtx.txIf(async (txCtx) => {
718 const { contentFetchAttemptsSinceSuccess: currentAttempt } = await txCtx.one(this.statement.topicAttempts, { topicId });
719 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
720 result = await txCtx.result(this.statement.topicAttemptsIncrement, { topicId, nextAttemptDelaySeconds });
721 if (result.rowCount != 1) {
722 throw new DBErrors.UnexpectedResult('did not set topic attempts');
723 }
724 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
725 if (result.rowCount != 1) {
726 throw new DBErrors.UnexpectedResult('did not release topic fetch');
727 }
728 return result;
729 });
730 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
731 return this._engineInfo(result);
732 } catch (e) {
733 this.logger.error(_scope, 'failed', { error: e, result, topicId });
734 throw e;
735 }
736 }
737
738
739 async topicFetchRequested(dbCtx, topicId) {
740 const _scope = _fileScope('topicFetchRequested');
741 this.logger.debug(_scope, 'called', { topicId });
742
743 let result;
744 try {
745 result = await dbCtx.result(this.statement.topicContentFetchRequested, { topicId });
746 if (result.rowCount != 1) {
747 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
748 }
749 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
750 return this._engineInfo(result);
751 } catch (e) {
752 this.logger.error(_scope, 'failed', { error: e, topicId });
753 throw e;
754 }
755 }
756
757
758 async topicGetAll(dbCtx) {
759 const _scope = _fileScope('topicGetAll');
760 this.logger.debug(_scope, 'called');
761
762 let topics;
763 try {
764 topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
765 } catch (e) {
766 this.logger.error(_scope, 'failed', { error: e, topics });
767 throw e;
768 }
769 if (topics) {
770 topics = topics.map(this._topicDefaults.bind(this));
771 }
772 return topics;
773 }
774
775
776 async topicGetById(dbCtx, topicId, applyDefaults = true) {
777 const _scope = _fileScope('topicGetById');
778 this.logger.debug(_scope, 'called', { topicId });
779
780 let topic;
781 try {
782 topic = await dbCtx.oneOrNone(this.statement.topicGetById, { topicId });
783 if (applyDefaults) {
784 topic = this._topicDefaults(topic);
785 }
786 return topic;
787 } catch (e) {
788 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
789 throw e;
790 }
791 }
792
793
794 async topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
795 const _scope = _fileScope('topicGetByUrl');
796 this.logger.debug(_scope, 'called', { topicUrl });
797
798 let topic;
799 try {
800 topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
801 if (applyDefaults) {
802 topic = this._topicDefaults(topic);
803 }
804 return topic;
805 } catch (e) {
806 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
807 throw e;
808 }
809 }
810
811
812 async topicGetContentById(dbCtx, topicId) {
813 const _scope = _fileScope('topicGetContentById');
814 this.logger.debug(_scope, 'called', { topicId });
815
816 let topic;
817 try {
818 topic = this._cacheGet(topicId);
819 if (topic) {
820 return topic;
821 }
822 topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
823 const topicWithDefaults = this._topicDefaults(topic);
824 this._cacheSet(topicId, topicWithDefaults);
825 return topicWithDefaults;
826 } catch (e) {
827 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
828 throw e;
829 }
830 }
831
832
833 async topicPendingDelete(dbCtx, topicId) {
834 const _scope = _fileScope('topicPendingDelete');
835 this.logger.debug(_scope, 'called', { topicId });
836
837 try {
838 await dbCtx.txIf(async (txCtx) => {
839 const topic = await txCtx.one(this.statement.topicGetById, { topicId });
840 if (!topic.isDeleted) {
841 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
842 return;
843 }
844
845 const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
846 if (subscriberCount) {
847 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
848 return;
849 }
850
851 const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
852 if (result.rowCount !== 1) {
853 throw new DBErrors.UnexpectedResult('did not delete topic');
854 }
855 });
856 this.logger.debug(_scope, 'success', { topicId });
857 } catch (e) {
858 this.logger.error(_scope, 'failed', { error: e, topicId });
859 throw e;
860 }
861 }
862
863
864 async topicPublishHistory(dbCtx, topicId, days) {
865 const _scope = _fileScope('topicPublishHistory');
866 this.logger.debug(_scope, 'called', { topicId, days });
867
868 const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days });
869 const history = Array.from({ length: days }, () => 0);
870 events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
871
872 return history;
873 }
874
875
876 async topicSet(dbCtx, data) {
877 const _scope = _fileScope('topicSet');
878 this.logger.debug(_scope, 'called', data);
879
880 const topicSetData = {
881 publisherValidationUrl: null,
882 leaseSecondsPreferred: null,
883 leaseSecondsMin: null,
884 leaseSecondsMax: null,
885 ...data,
886 };
887
888 let result;
889 try {
890 this._topicSetDataValidate(topicSetData);
891 result = await dbCtx.result(this.statement.topicUpsert, topicSetData);
892 if (result.rowCount != 1) {
893 throw new DBErrors.UnexpectedResult('did not set topic data');
894 }
895 this.logger.debug(_scope, 'success', { topicSetData, ...this._resultLog(result) });
896 return this._engineInfo(result);
897 } catch (e) {
898 this.logger.error(_scope, 'failed', { error: e, result });
899 throw e;
900 }
901 }
902
903
904 async topicSetContent(dbCtx, data) {
905 const _scope = _fileScope('topicSetContent');
906 const topicSetContentData = {
907 contentType: null,
908 httpETag: null,
909 httpLastModified: null,
910 ...data,
911 };
912 const logData = {
913 ...topicSetContentData,
914 content: common.logTruncate(topicSetContentData.content, 100),
915 };
916 this.logger.debug(_scope, 'called', data);
917
918 let result;
919 try {
920 this._topicSetContentDataValidate(topicSetContentData);
921 result = await dbCtx.result(this.statement.topicSetContent, topicSetContentData);
922 logData.result = this._resultLog(result);
923 if (result.rowCount != 1) {
924 throw new DBErrors.UnexpectedResult('did not set topic content');
925 }
926 result = await dbCtx.result(this.statement.topicSetContentHistory, {
927 topicId: data.topicId,
928 contentHash: data.contentHash,
929 contentSize: data.content.length,
930 });
931 if (result.rowCount != 1) {
932 throw new DBErrors.UnexpectedResult('did not set topic content history');
933 }
934 this.logger.debug(_scope, 'success', { ...logData });
935 return this._engineInfo(result);
936 } catch (e) {
937 this.logger.error(_scope, 'failed', { error: e, ...logData });
938 throw e;
939 }
940 }
941
942
943 async topicUpdate(dbCtx, data) {
944 const _scope = _fileScope('topicUpdate');
945 this.logger.debug(_scope, 'called', { data });
946
947 const topicData = {
948 leaseSecondsPreferred: null,
949 leaseSecondsMin: null,
950 leaseSecondsMax: null,
951 publisherValidationUrl: null,
952 ...data,
953 };
954
955 this._topicUpdateDataValidate(topicData);
956
957 let result;
958 try {
959 result = await dbCtx.result(this.statement.topicUpdate, topicData);
960 if (result.rowCount != 1) {
961 throw new DBErrors.UnexpectedResult('did not update topic');
962 }
963 } catch (e) {
964 this.logger.error(_scope, 'failed', { error: e, topicData });
965 throw e;
966 }
967 }
968
969
970 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
971 const _scope = _fileScope('verificationClaim');
972 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
973
974 let result;
975 try {
976 await dbCtx.txIf(async (txCtx) => {
977 result = await txCtx.manyOrNone(this.statement.verificationClaim, { claimant, wanted, claimTimeoutSeconds });
978 });
979 return result.map((r) => r.id);
980 } catch (e) {
981 this.logger.error(_scope, 'failed', { wanted, claimTimeoutSeconds });
982 throw e;
983 }
984 }
985
986
987
988 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
989 const _scope = _fileScope('verificationClaimById');
990 this.logger.debug(_scope, 'called', { verificationId, claimant, claimTimeoutSeconds });
991
992 let result;
993 try {
994 await dbCtx.txIf(async (txCtx) => {
995 result = await txCtx.result(this.statement.verificationClaimById, { verificationId, claimant, claimTimeoutSeconds });
996 });
997 return this._engineInfo(result);
998 } catch (e) {
999 this.logger.error(_scope, 'failed', { verificationId, claimant, claimTimeoutSeconds });
1000 throw e;
1001 }
1002 }
1003
1004
1005 async verificationComplete(dbCtx, verificationId, callback, topicId) {
1006 const _scope = _fileScope('verificationComplete');
1007 this.logger.debug(_scope, 'called', { verificationId });
1008
1009 let result;
1010 try {
1011 await dbCtx.txIf(async (txCtx) => {
1012 result = await txCtx.result(this.statement.verificationScrub, { verificationId, callback, topicId });
1013 if (result.rowCount < 1) {
1014 throw new DBErrors.UnexpectedResult('did not remove verifications');
1015 }
1016 });
1017 } catch (e) {
1018 this.logger.error(_scope, 'failed', { verificationId });
1019 throw e;
1020 }
1021 return this._engineInfo(result);
1022 }
1023
1024
1025 async verificationGetById(dbCtx, verificationId) {
1026 const _scope = _fileScope('verificationGetById');
1027 this.logger.debug(_scope, 'called', { verificationId });
1028
1029 let verification;
1030 try {
1031 verification = await dbCtx.oneOrNone(this.statement.verificationGetById, { verificationId });
1032 return verification;
1033 } catch (e) {
1034 this.logger.error(_scope, 'failed', { error: e, verificationId });
1035 throw e;
1036 }
1037 }
1038
1039
1040 async verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
1041 const _scope = _fileScope('verificationIncomplete');
1042 this.logger.debug(_scope, 'called', { verificationId });
1043
1044 let result;
1045 try {
1046 await dbCtx.txIf(async (txCtx) => {
1047 const { attempts } = await txCtx.one(this.statement.verificationAttempts, { verificationId });
1048 const nextAttemptDelaySeconds = common.attemptRetrySeconds(attempts, retryDelays);
1049 result = await txCtx.result(this.statement.verificationAttemptIncrement, { verificationId, nextAttemptDelaySeconds });
1050 if (result.rowCount != 1) {
1051 throw new DBErrors.UnexpectedResult('did not update verification attempts');
1052 }
1053 result = await txCtx.result(this.statement.verificationDone, { verificationId });
1054 if (result.rowCount != 1) {
1055 throw new DBErrors.UnexpectedResult('did not release verification');
1056 }
1057 });
1058 } catch (e) {
1059 this.logger.error(_scope, 'failed', { error: e, verificationId });
1060 throw e;
1061 }
1062 }
1063
1064
1065 async verificationInsert(dbCtx, verification) {
1066 const _scope = _fileScope('verificationInsert');
1067 this.logger.debug(_scope, 'called', { verification });
1068
1069 const verificationData = {
1070 secret: null,
1071 httpRemoteAddr: null,
1072 httpFrom: null,
1073 requestId: null,
1074 ...verification,
1075 };
1076
1077 let result, verificationId;
1078 try {
1079 this._verificationDataValidate(verificationData);
1080 result = await dbCtx.result(this.statement.verificationInsert, verificationData);
1081 if (result.rowCount != 1) {
1082 throw new DBErrors.UnexpectedResult('did not insert verification');
1083 }
1084 verificationId = result.rows[0].id;
1085 this.logger.debug(_scope, 'inserted verification', { verificationId });
1086
1087 return verificationId;
1088 } catch (e) {
1089 this.logger.error(_scope, 'failed', { error: e, verificationData });
1090 throw e;
1091 }
1092 }
1093
1094
1095 async verificationRelease(dbCtx, verificationId) {
1096 const _scope = _fileScope('verificationRelease');
1097 this.logger.debug(_scope, 'called', { verificationId });
1098
1099 let result;
1100 try {
1101 result = await dbCtx.result(this.statement.verificationDone, { verificationId });
1102 if (result.rowCount != 1) {
1103 throw new DBErrors.UnexpectedResult('did not release verification');
1104 }
1105 return this._engineInfo(result);
1106 } catch (e) {
1107 this.logger.error(_scope, 'failed', { error: e, verificationId });
1108 throw e;
1109 }
1110 }
1111
1112
1113 async verificationUpdate(dbCtx, verificationId, data) {
1114 const _scope = _fileScope('verificationUpdate');
1115 this.logger.debug(_scope, 'called', { verificationId, data });
1116
1117 const verificationData = {
1118 reason: null,
1119 verificationId,
1120 ...data,
1121 };
1122
1123 let result;
1124 try {
1125 this._verificationUpdateDataValidate(verificationData);
1126 result = await dbCtx.result(this.statement.verificationUpdate, verificationData);
1127 if (result.rowCount != 1) {
1128 throw new DBErrors.UnexpectedResult('did not update verification');
1129 }
1130 } catch (e) {
1131 this.logger.error(_scope, 'failed', { error: e, verificationData });
1132 throw e;
1133 }
1134 }
1135
1136
1137 async verificationValidated(dbCtx, verificationId) {
1138 const _scope = _fileScope('verificationValidated');
1139 this.logger.debug(_scope, 'called', { verificationId });
1140
1141 let result;
1142 try {
1143 result = await dbCtx.result(this.statement.verificationValidate, { verificationId });
1144 if (result.rowCount != 1) {
1145 throw new DBErrors.UnexpectedResult('did not set verification validation');
1146 }
1147 } catch (e) {
1148 this.logger.error(_scope, 'failed', { error: e, verificationId });
1149 throw e;
1150 }
1151 }
1152
1153 }
1154
1155 module.exports = DatabasePostgres;