Initial release
[websub-hub] / src / db / postgres / index.js
1 /* eslint-disable security/detect-object-injection */
2 'use strict';
3
4 const pgpInitOptions = {
5 capSQL: true,
6 };
7
8 const path = require('path');
9 const pgp = require('pg-promise')(pgpInitOptions);
10 const svh = require('../schema-version-helper');
11 const Database = require('../base');
12 const DBErrors = require('../errors');
13 const common = require('../../common');
14
15 const _fileScope = common.fileScope(__filename);
16
17 const PGTypeIdINT8 = 20; // Type Id 20 == INT8 (BIGINT)
18 const PGTYpeIdINT8Array = 1016; //Type Id 1016 == INT8[] (BIGINT[])
19 pgp.pg.types.setTypeParser(PGTypeIdINT8, BigInt); // Type Id 20 = INT8 (BIGINT)
20 const parseBigIntArray = pgp.pg.types.getTypeParser(PGTYpeIdINT8Array); // Type Id 1016 = INT8[] (BIGINT[])
21 pgp.pg.types.setTypeParser(PGTYpeIdINT8Array, (a) => parseBigIntArray(a).map(BigInt));
22
23 const schemaVersionsSupported = {
24 min: {
25 major: 1,
26 minor: 0,
27 patch: 0,
28 },
29 max: {
30 major: 1,
31 minor: 0,
32 patch: 0,
33 },
34 };
35
36 class DatabasePostgres extends Database {
37 constructor(logger, options, _pgp = pgp) {
38 super(logger, options);
39
40 this.db = _pgp(options.db.connectionString);
41 this.schemaVersionsSupported = schemaVersionsSupported;
42
43 // Suppress QF warnings when running tests
44 this.noWarnings = options.db.noWarnings;
45
46 // Log queries
47 const queryLogLevel = options.db.queryLogLevel;
48 if (queryLogLevel) {
49 pgpInitOptions.query = (event) => {
50 this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
51 };
52 }
53
54 // Log errors
55 pgpInitOptions.error = (err, event) => {
56 this.logger.error(_fileScope('pgp:error'), '', { err, event });
57 };
58
59 // Deophidiate column names in-place, log results
60 pgpInitOptions.receive = (data, result, event) => {
61 const exemplaryRow = data[0];
62 for (const prop in exemplaryRow) {
63 const camel = Database._camelfy(prop);
64 if (!(camel in exemplaryRow)) {
65 for (const d of data) {
66 d[camel] = d[prop];
67 delete d[prop];
68 }
69 }
70 }
71 if (queryLogLevel) {
72 // Omitting .rows
73 const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
74 this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
75 }
76 };
77
78 // Expose these for test coverage
79 this.pgpInitOptions = pgpInitOptions;
80 this._pgp = _pgp;
81
82 this._initStatements(_pgp);
83 }
84
85
86 _queryFileHelper(_pgp) {
87 return (file) => {
88 const _scope = _fileScope('_queryFile');
89 const qfParams = {
90 minify: true,
91 ...(this.noWarnings && { noWarnings: this.noWarnings }),
92 };
93 const qf = new _pgp.QueryFile(file, qfParams);
94 if (qf.error) {
95 this.logger.error(_scope, 'failed to create SQL statement', { error: qf.error, file });
96 throw qf.error;
97 }
98 return qf;
99 };
100 }
101
102
103 async schemaCheck(applyMigrations = true) {
104 const _scope = _fileScope('schemaCheck');
105 this.logger.debug(_scope, 'called', { applyMigrations });
106 if (applyMigrations) {
107 await this._initTables();
108 }
109 await super.schemaCheck();
110 }
111
112
113 async _initTables(_pgp) {
114 const _scope = _fileScope('_initTables');
115 this.logger.debug(_scope, 'called', {});
116
117 const _queryFile = this._queryFileHelper(_pgp || this._pgp);
118
119 // Migrations rely upon this table, ensure it exists.
120 const metaVersionTable = '_meta_schema_version';
121
122 const tableExists = async (name) => this.db.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name });
123 let metaExists = await tableExists(metaVersionTable);
124 if (!metaExists) {
125 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
126 const initSql = _queryFile(fPath);
127 const results = await this.db.multiResult(initSql);
128 this.logger.debug(_scope, 'executed init sql', { results });
129 metaExists = await tableExists(metaVersionTable);
130 /* istanbul ignore if */
131 if (!metaExists) {
132 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
133 }
134 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
135 }
136
137 // Apply migrations
138 const currentSchema = await this._currentSchema();
139 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
140 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
141 for (const v of migrationsWanted) {
142 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
143 const migrationSql = _queryFile(fPath);
144 const results = await this.db.multiResult(migrationSql);
145 this.logger.debug(_scope, 'executed migration sql', { version: v, results });
146 this.logger.info(_scope, 'applied migration', { version: v });
147 }
148 }
149
150
151 _initStatements(_pgp) {
152 const _scope = _fileScope('_initStatements');
153 const _queryFile = this._queryFileHelper(_pgp);
154 this.statement = _pgp.utils.enumSql(path.join(__dirname, 'sql'), {}, _queryFile);
155 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
156 }
157
158
159 async healthCheck() {
160 const _scope = _fileScope('healthCheck');
161 this.logger.debug(_scope, 'called', {});
162 const c = await this.db.connect();
163 c.done();
164 return { serverVersion: c.client.serverVersion };
165 }
166
167
168 async _currentSchema() {
169 return this.db.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
170 }
171
172
173 async _closeConnection() {
174 const _scope = _fileScope('_closeConnection');
175 try {
176 await this._pgp.end();
177 } catch (e) {
178 this.logger.error(_scope, 'failed', { error: e });
179 throw e;
180 }
181 }
182
183
184 async _purgeTables(really = false) {
185 const _scope = _fileScope('_purgeTables');
186 try {
187 if (really) {
188 await this.db.tx(async (t) => {
189 await t.batch([
190 'topic',
191 // 'topic_fetch_in_progress',
192 // 'verification',
193 // 'verification_in_progress',
194 // 'subscription',
195 // 'subscription_delivery_in_progress',
196 ].map(async (table) => t.query('TRUNCATE TABLE $(table:name) CASCADE', { table })));
197 });
198 }
199 } catch (e) {
200 this.logger.error(_scope, 'failed', { error: e });
201 throw e;
202 }
203 }
204
205
206 // eslint-disable-next-line class-methods-use-this
207 _engineInfo(result) {
208 return {
209 changes: result.rowCount,
210 lastInsertRowid: result.rows.length ? result.rows[0].id : undefined,
211 duration: result.duration,
212 };
213 }
214
215
216 // eslint-disable-next-line class-methods-use-this
217 _resultLog(result) {
218 return common.pick(result, ['command', 'rowCount', 'duration']);
219 }
220
221
222 async context(fn) {
223 return this.db.task(async (t) => fn(t));
224 }
225
226
227 // eslint-disable-next-line class-methods-use-this
228 async transaction(dbCtx, fn) {
229 return dbCtx.txIf(async (t) => fn(t));
230 }
231
232
233 async authenticationSuccess(dbCtx, identifier) {
234 const _scope = _fileScope('authenticationSuccess');
235 this.logger.debug(_scope, 'called', { identifier });
236
237 let result;
238 try {
239 result = await dbCtx.result(this.statement.authenticationSuccess, { identifier });
240 if (result.rowCount != 1) {
241 throw new DBErrors.UnexpectedResult('did not update authentication success event');
242 }
243 } catch (e) {
244 this.logger.error(_scope, 'failed', { error: e, identifier });
245 throw e;
246 }
247 }
248
249
250 async authenticationGet(dbCtx, identifier) {
251 const _scope = _fileScope('authenticationGet');
252 this.logger.debug(_scope, 'called', { identifier });
253
254 let auth;
255 try {
256 auth = await dbCtx.oneOrNone(this.statement.authenticationGet, { identifier });
257 return auth;
258 } catch (e) {
259 this.logger.error(_scope, 'failed', { error: e, identifier });
260 throw e;
261 }
262 }
263
264
265 async authenticationUpsert(dbCtx, identifier, credential) {
266 const _scope = _fileScope('authenticationUpsert');
267 const scrubbedCredential = '*'.repeat((credential || '').length);
268 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
269
270 let result;
271 try {
272 result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
273 if (result.rowCount != 1) {
274 throw new DBErrors.UnexpectedResult('did not upsert authentication');
275 }
276 } catch (e) {
277 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
278 throw e;
279 }
280 }
281
282
283 async subscriptionsByTopicId(dbCtx, topicId) {
284 const _scope = _fileScope('subscriptionsByTopicId');
285 this.logger.debug(_scope, 'called', { topicId });
286
287 let count;
288 try {
289 count = await dbCtx.manyOrNone(this.statement.subscriptionsByTopicId, { topicId });
290 return count;
291 } catch (e) {
292 this.logger.error(_scope, 'failed', { error: e, topicId });
293 throw e;
294 }
295 }
296
297
298 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
299 const _scope = _fileScope('subscriptionCountByTopicUrl');
300 this.logger.debug(_scope, 'called', { topicUrl });
301
302 let count;
303 try {
304 count = await dbCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl });
305 return count;
306 } catch (e) {
307 this.logger.error(_scope, 'failed', { error: e, topicUrl });
308 throw e;
309 }
310 }
311
312
313 async subscriptionDelete(dbCtx, callback, topicId) {
314 const _scope = _fileScope('subscriptionDelete');
315 this.logger.debug(_scope, 'called', { callback, topicId });
316
317 try {
318 const result = await dbCtx.result(this.statement.subscriptionDelete, { callback, topicId });
319 return this._engineInfo(result);
320 } catch (e) {
321 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
322 throw e;
323 }
324 }
325
326
327 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
328 const _scope = _fileScope('subscriptionDeliveryClaim');
329 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
330
331 try {
332 const claims = await dbCtx.txIf(async (txCtx) => {
333 return txCtx.manyOrNone(this.statement.subscriptionDeliveryClaim, { claimant, wanted, claimTimeoutSeconds });
334 });
335 return claims.map((r) => r.id);
336 } catch (e) {
337 this.logger.error(_scope, 'failed', { error: e, claimant, wanted, claimTimeoutSeconds });
338 throw e;
339 }
340 }
341
342
343 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
344 const _scope = _fileScope('subscriptionDeliveryClaimById');
345 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
346
347 let result;
348 try {
349 result = await dbCtx.txIf(async (txCtx) => {
350 result = await txCtx.result(this.statement.subscriptionDeliveryClaimById, { claimant, subscriptionId, claimTimeoutSeconds });
351 if (result.rowCount != 1) {
352 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
353 }
354 return result;
355 });
356 return this._engineInfo(result);
357 } catch (e) {
358 this.logger.error(_scope, 'failed', { error: e, claimant, subscriptionId, claimTimeoutSeconds });
359 throw e;
360 }
361 }
362
363
364 async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
365 const _scope = _fileScope('subscriptionDeliveryComplete');
366 this.logger.debug(_scope, 'called', { callback, topicId });
367
368 let result;
369 try {
370 await dbCtx.txIf(async (txCtx) => {
371 result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
372 if (result.rowCount != 1) {
373 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
374 }
375 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
376 if (result.rowCount != 1) {
377 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
378 }
379 });
380 } catch (e) {
381 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
382 throw e;
383 }
384 }
385
386
387 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
388 const _scope = _fileScope('subscriptionDeliveryGone');
389 this.logger.debug(_scope, 'called', { callback, topicId });
390
391 let result;
392 try {
393 await dbCtx.txIf(async (txCtx) => {
394 result = await txCtx.result(this.statement.subscriptionDelete, { callback, topicId });
395 if (result.rowCount != 1) {
396 throw new DBErrors.UnexpectedResult('did not delete subscription');
397 }
398 // Delete cascades to delivery
399 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
400 // if (result.rowCount != 1) {
401 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
402 // }
403 });
404 } catch (e) {
405 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
406 throw e;
407 }
408 }
409
410
411 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
412 const _scope = _fileScope('subscriptionDeliveryIncomplete');
413 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
414
415 let result;
416 try {
417 await dbCtx.txIf(async (txCtx) => {
418 const { currentAttempt } = await txCtx.one(this.statement.subscriptionDeliveryAttempts, { callback, topicId });
419 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
420 result = await txCtx.result(this.statement.subscriptionDeliveryFailure, { nextAttemptDelaySeconds, callback, topicId });
421 if (result.rowCount != 1) {
422 throw new DBErrors.UnexpectedResult('did not set subscription delivery failure');
423 }
424 result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
425 if (result.rowCount != 1) {
426 throw new DBErrors.UnexpectedResult('did not release subscription delivery');
427 }
428 });
429 } catch (e) {
430 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
431 throw e;
432 }
433 }
434
435
436 async subscriptionGet(dbCtx, callback, topicId) {
437 const _scope = _fileScope('subscriptionGet');
438 this.logger.debug(_scope, 'called', { callback, topicId });
439
440 let subscription;
441 try {
442 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGet, { callback, topicId });
443 return subscription;
444 } catch (e) {
445 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
446 throw e;
447 }
448 }
449
450
451 async subscriptionGetById(dbCtx, subscriptionId) {
452 const _scope = _fileScope('subscriptionGetById');
453 this.logger.debug(_scope, 'called', { subscriptionId });
454
455 let subscription;
456 try {
457 subscription = await dbCtx.oneOrNone(this.statement.subscriptionGetById, { subscriptionId });
458 return subscription;
459 } catch (e) {
460 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
461 throw e;
462 }
463 }
464
465
466 async subscriptionUpdate(dbCtx, data) {
467 const _scope = _fileScope('subscriptionUpdate');
468 this.logger.debug(_scope, 'called', { data });
469
470 const subscriptionData = {
471 ...data,
472 };
473
474 this._subscriptionUpdateDataValidate(subscriptionData);
475
476 let result;
477 try {
478 result = await dbCtx.result(this.statement.subscriptionUpdate, subscriptionData);
479 if (result.rowCount != 1) {
480 throw new DBErrors.UnexpectedResult('did not update subscription');
481 }
482 } catch (e) {
483 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
484 throw e;
485 }
486 }
487
488
489 async subscriptionUpsert(dbCtx, data) {
490 const _scope = _fileScope('subscriptionUpsert');
491 this.logger.debug(_scope, 'called', { ...data });
492
493 const subscriptionData = {
494 secret: null,
495 httpRemoteAddr: null,
496 httpFrom: null,
497 ...data,
498 };
499 this._subscriptionUpsertDataValidate(subscriptionData);
500
501 let result;
502 try {
503 result = await dbCtx.result(this.statement.subscriptionUpsert, subscriptionData);
504 if (result.rowCount != 1) {
505 throw new DBErrors.UnexpectedResult('did not upsert subscription');
506 }
507 return this._engineInfo(result);
508 } catch (e) {
509 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
510 throw e;
511 }
512 }
513
514
515 async topicDeleted(dbCtx, topicId) {
516 const _scope = _fileScope('topicDeleted');
517 this.logger.debug(_scope, 'called', { topicId });
518
519 let result;
520 try {
521 result = await dbCtx.result(this.statement.topicDeleted, { topicId });
522 if (result.rowCount != 1) {
523 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
524 }
525 } catch (e) {
526 this.logger.error(_scope, 'failed to update topic as deleted', { error: e, topicId });
527 throw e;
528 }
529 }
530
531
532 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
533 const _scope = _fileScope('topicFetchClaim');
534 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
535
536 let claims;
537 try {
538 await dbCtx.txIf(async (txCtx) => {
539 claims = await txCtx.manyOrNone(this.statement.topicContentFetchClaim, { claimant, wanted, claimTimeoutSeconds });
540 });
541 return claims.map((r) => r.id);
542 } catch (e) {
543 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e });
544 throw e;
545 }
546 }
547
548
549 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
550 const _scope = _fileScope('topicFetchClaimById');
551 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
552
553 let result;
554 try {
555 await dbCtx.txIf(async (txCtx) => {
556 result = await txCtx.result(this.statement.topicContentFetchClaimById, { topicId, claimant, claimTimeoutSeconds });
557 });
558 return this._engineInfo(result);
559 } catch (e) {
560 this.logger.error(_scope, 'failed', { error: e, topicId });
561 throw e;
562 }
563 }
564
565
566 async topicFetchComplete(dbCtx, topicId) {
567 const _scope = _fileScope('topicFetchComplete');
568 this.logger.debug(_scope, 'called', { topicId });
569
570 let result;
571 try {
572 await dbCtx.txIf(async (txCtx) => {
573 result = await txCtx.result(this.statement.topicAttemptsReset, { topicId });
574 if (result.rowCount != 1) {
575 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
576 }
577 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
578 if (result.rowCount != 1) {
579 throw new DBErrors.UnexpectedResult('did not release topic fetch');
580 }
581 });
582 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
583 return this._engineInfo(result);
584 } catch (e) {
585 this.logger.error(_scope, 'failed', { error: e, result, topicId });
586 throw e;
587 }
588 }
589
590
591 async topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
592 const _scope = _fileScope('topicFetchIncomplete');
593 this.logger.debug(_scope, 'called', { topicId });
594
595 let result;
596 try {
597 result = await dbCtx.txIf(async (txCtx) => {
598 const { contentFetchAttemptsSinceSuccess: currentAttempt } = await txCtx.one(this.statement.topicAttempts, { topicId });
599 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
600 result = await txCtx.result(this.statement.topicAttemptsIncrement, { topicId, nextAttemptDelaySeconds });
601 if (result.rowCount != 1) {
602 throw new DBErrors.UnexpectedResult('did not set topic attempts');
603 }
604 result = await txCtx.result(this.statement.topicContentFetchDone, { topicId });
605 if (result.rowCount != 1) {
606 throw new DBErrors.UnexpectedResult('did not release topic fetch');
607 }
608 return result;
609 });
610 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
611 return this._engineInfo(result);
612 } catch (e) {
613 this.logger.error(_scope, 'failed', { error: e, result, topicId });
614 throw e;
615 }
616 }
617
618
619 async topicFetchRequested(dbCtx, topicId) {
620 const _scope = _fileScope('topicFetchRequested');
621 this.logger.debug(_scope, 'called', { topicId });
622
623 let result;
624 try {
625 result = await dbCtx.result(this.statement.topicContentFetchRequested, { topicId });
626 if (result.rowCount != 1) {
627 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
628 }
629 this.logger.debug(_scope, 'success', { topicId, ...this._resultLog(result) });
630 return this._engineInfo(result);
631 } catch (e) {
632 this.logger.error(_scope, 'failed', { error: e, topicId });
633 throw e;
634 }
635 }
636
637
638 async topicGetAll(dbCtx) {
639 const _scope = _fileScope('topicGetAll');
640 this.logger.debug(_scope, 'called');
641
642 let topics;
643 try {
644 topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
645 } catch (e) {
646 this.logger.error(_scope, 'failed', { error: e, topics });
647 throw e;
648 }
649 if (topics) {
650 topics = topics.map(this._topicDefaults.bind(this));
651 }
652 return topics;
653 }
654
655
656 async topicGetById(dbCtx, topicId, applyDefaults = true) {
657 const _scope = _fileScope('topicGetById');
658 this.logger.debug(_scope, 'called', { topicId });
659
660 let topic;
661 try {
662 topic = await dbCtx.oneOrNone(this.statement.topicGetById, { topicId });
663 if (applyDefaults) {
664 topic = this._topicDefaults(topic);
665 }
666 return topic;
667 } catch (e) {
668 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
669 throw e;
670 }
671 }
672
673
674 async topicGetByUrl(dbCtx, topicUrl) {
675 const _scope = _fileScope('topicGetByUrl');
676 this.logger.debug(_scope, 'called', { topicUrl });
677
678 let topic;
679 try {
680 topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
681 return this._topicDefaults(topic);
682 } catch (e) {
683 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
684 throw e;
685 }
686 }
687
688
689 async topicGetContentById(dbCtx, topicId) {
690 const _scope = _fileScope('topicGetContentById');
691 this.logger.debug(_scope, 'called', { topicId });
692
693 let topic;
694 try {
695 topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
696 return this._topicDefaults(topic);
697 } catch (e) {
698 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
699 throw e;
700 }
701 }
702
703
704 async topicSet(dbCtx, data) {
705 const _scope = _fileScope('topicSet');
706 this.logger.debug(_scope, 'called', data);
707
708 const topicSetData = {
709 publisherValidationUrl: null,
710 leaseSecondsPreferred: null,
711 leaseSecondsMin: null,
712 leaseSecondsMax: null,
713 ...data,
714 };
715
716 let result;
717 try {
718 this._topicSetDataValidate(topicSetData);
719 result = await dbCtx.result(this.statement.topicUpsert, topicSetData);
720 if (result.rowCount != 1) {
721 throw new DBErrors.UnexpectedResult('did not set topic data');
722 }
723 this.logger.debug(_scope, 'success', { topicSetData, ...this._resultLog(result) });
724 return this._engineInfo(result);
725 } catch (e) {
726 this.logger.error(_scope, 'failed', { error: e, result });
727 throw e;
728 }
729 }
730
731
732 async topicSetContent(dbCtx, data) {
733 const _scope = _fileScope('topicSetContent');
734 const topicSetContentData = {
735 contentType: null,
736 ...data,
737 };
738 const logData = {
739 ...topicSetContentData,
740 content: common.logTruncate(topicSetContentData.content, 100),
741 };
742 this.logger.debug(_scope, 'called', data);
743
744 let result;
745 try {
746 this._topicSetContentDataValidate(topicSetContentData);
747 result = await dbCtx.result(this.statement.topicSetContent, topicSetContentData);
748 logData.result = this._resultLog(result);
749 if (result.rowCount != 1) {
750 throw new DBErrors.UnexpectedResult('did not set topic content');
751 }
752 this.logger.debug(_scope, 'success', { ...logData });
753 return this._engineInfo(result);
754 } catch (e) {
755 this.logger.error(_scope, 'failed', { error: e, ...logData });
756 throw e;
757 }
758 }
759
760
761 async topicUpdate(dbCtx, data) {
762 const _scope = _fileScope('topicUpdate');
763 this.logger.debug(_scope, 'called', { data });
764
765 const topicData = {
766 leaseSecondsPreferred: null,
767 leaseSecondsMin: null,
768 leaseSecondsMax: null,
769 publisherValidationUrl: null,
770 ...data,
771 };
772
773 this._topicUpdateDataValidate(topicData);
774
775 let result;
776 try {
777 result = await dbCtx.result(this.statement.topicUpdate, topicData);
778 if (result.rowCount != 1) {
779 throw new DBErrors.UnexpectedResult('did not update topic');
780 }
781 } catch (e) {
782 this.logger.error(_scope, 'failed', { error: e, topicData });
783 throw e;
784 }
785 }
786
787
788 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
789 const _scope = _fileScope('verificationClaim');
790 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
791
792 let result;
793 try {
794 await dbCtx.txIf(async (txCtx) => {
795 result = await txCtx.manyOrNone(this.statement.verificationClaim, { claimant, wanted, claimTimeoutSeconds });
796 });
797 return result.map((r) => r.id);
798 } catch (e) {
799 this.logger.error(_scope, 'failed', { wanted, claimTimeoutSeconds });
800 throw e;
801 }
802 }
803
804
805
806 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
807 const _scope = _fileScope('verificationClaimById');
808 this.logger.debug(_scope, 'called', { verificationId, claimant, claimTimeoutSeconds });
809
810 let result;
811 try {
812 await dbCtx.txIf(async (txCtx) => {
813 result = await txCtx.result(this.statement.verificationClaimById, { verificationId, claimant, claimTimeoutSeconds });
814 });
815 return this._engineInfo(result);
816 } catch (e) {
817 this.logger.error(_scope, 'failed', { verificationId, claimant, claimTimeoutSeconds });
818 throw e;
819 }
820 }
821
822
823 async verificationComplete(dbCtx, verificationId, callback, topicId) {
824 const _scope = _fileScope('verificationComplete');
825 this.logger.debug(_scope, 'called', { verificationId });
826
827 let result;
828 try {
829 await dbCtx.txIf(async (txCtx) => {
830 result = await txCtx.result(this.statement.verificationScrub, { verificationId, callback, topicId });
831 if (result.rowCount < 1) {
832 throw new DBErrors.UnexpectedResult('did not remove verifications');
833 }
834 });
835 } catch (e) {
836 this.logger.error(_scope, 'failed', { verificationId });
837 throw e;
838 }
839 return this._engineInfo(result);
840 }
841
842
843 async verificationGetById(dbCtx, verificationId) {
844 const _scope = _fileScope('verificationGetById');
845 this.logger.debug(_scope, 'called', { verificationId });
846
847 let verification;
848 try {
849 verification = await dbCtx.oneOrNone(this.statement.verificationGetById, { verificationId });
850 return verification;
851 } catch (e) {
852 this.logger.error(_scope, 'failed', { error: e, verificationId });
853 throw e;
854 }
855 }
856
857
858 async verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
859 const _scope = _fileScope('verificationIncomplete');
860 this.logger.debug(_scope, 'called', { verificationId });
861
862 let result;
863 try {
864 await dbCtx.txIf(async (txCtx) => {
865 const { attempts } = await txCtx.one(this.statement.verificationAttempts, { verificationId });
866 const nextAttemptDelaySeconds = common.attemptRetrySeconds(attempts, retryDelays);
867 result = await txCtx.result(this.statement.verificationAttemptIncrement, { verificationId, nextAttemptDelaySeconds });
868 if (result.rowCount != 1) {
869 throw new DBErrors.UnexpectedResult('did not update verification attempts');
870 }
871 result = await txCtx.result(this.statement.verificationDone, { verificationId });
872 if (result.rowCount != 1) {
873 throw new DBErrors.UnexpectedResult('did not release verification');
874 }
875 });
876 } catch (e) {
877 this.logger.error(_scope, 'failed', { error: e, verificationId });
878 throw e;
879 }
880 }
881
882
883 async verificationInsert(dbCtx, verification) {
884 const _scope = _fileScope('verificationInsert');
885 this.logger.debug(_scope, 'called', { verification });
886
887 const verificationData = {
888 secret: null,
889 httpRemoteAddr: null,
890 httpFrom: null,
891 requestId: null,
892 ...verification,
893 };
894
895 let result, verificationId;
896 try {
897 this._verificationDataValidate(verificationData);
898 result = await dbCtx.result(this.statement.verificationInsert, verificationData);
899 if (result.rowCount != 1) {
900 throw new DBErrors.UnexpectedResult('did not insert verification');
901 }
902 verificationId = result.rows[0].id;
903 this.logger.debug(_scope, 'inserted verification', { verificationId });
904
905 return verificationId;
906 } catch (e) {
907 this.logger.error(_scope, 'failed', { error: e, verificationData });
908 throw e;
909 }
910 }
911
912
913 async verificationRelease(dbCtx, verificationId) {
914 const _scope = _fileScope('verificationRelease');
915 this.logger.debug(_scope, 'called', { verificationId });
916
917 let result;
918 try {
919 result = await dbCtx.result(this.statement.verificationDone, { verificationId });
920 if (result.rowCount != 1) {
921 throw new DBErrors.UnexpectedResult('did not release verification');
922 }
923 return this._engineInfo(result);
924 } catch (e) {
925 this.logger.error(_scope, 'failed', { error: e, verificationId });
926 throw e;
927 }
928 }
929
930
931 async verificationUpdate(dbCtx, verificationId, data) {
932 const _scope = _fileScope('verificationUpdate');
933 this.logger.debug(_scope, 'called', { verificationId, data });
934
935 const verificationData = {
936 reason: null,
937 verificationId,
938 ...data,
939 };
940
941 let result;
942 try {
943 this._verificationUpdateDataValidate(verificationData);
944 result = await dbCtx.result(this.statement.verificationUpdate, verificationData);
945 if (result.rowCount != 1) {
946 throw new DBErrors.UnexpectedResult('did not update verification');
947 }
948 } catch (e) {
949 this.logger.error(_scope, 'failed', { error: e, verificationData });
950 throw e;
951 }
952 }
953
954
955 async verificationValidated(dbCtx, verificationId) {
956 const _scope = _fileScope('verificationValidated');
957 this.logger.debug(_scope, 'called', { verificationId });
958
959 let result;
960 try {
961 result = await dbCtx.result(this.statement.verificationValidate, { verificationId });
962 if (result.rowCount != 1) {
963 throw new DBErrors.UnexpectedResult('did not set verification validation');
964 }
965 } catch (e) {
966 this.logger.error(_scope, 'failed', { error: e, verificationId });
967 throw e;
968 }
969 }
970
971 }
972
973 module.exports = DatabasePostgres;