1 /* eslint-disable security/detect-object-injection */
4 const pgpInitOptions
= {
8 const path
= require('path');
9 const pgp
= require('pg-promise')(pgpInitOptions
);
10 const svh
= require('../schema-version-helper');
11 const Database
= require('../base');
12 const DBErrors
= require('../errors');
13 const common
= require('../../common');
15 const _fileScope
= common
.fileScope(__filename
);
17 const PGTypeIdINT8
= 20; // Type Id 20 == INT8 (BIGINT)
18 const PGTYpeIdINT8Array
= 1016; //Type Id 1016 == INT8[] (BIGINT[])
19 pgp
.pg
.types
.setTypeParser(PGTypeIdINT8
, BigInt
); // Type Id 20 = INT8 (BIGINT)
20 const parseBigIntArray
= pgp
.pg
.types
.getTypeParser(PGTYpeIdINT8Array
); // Type Id 1016 = INT8[] (BIGINT[])
21 pgp
.pg
.types
.setTypeParser(PGTYpeIdINT8Array
, (a
) => parseBigIntArray(a
).map(BigInt
));
23 const schemaVersionsSupported
= {
36 class DatabasePostgres
extends Database
{
37 constructor(logger
, options
, _pgp
= pgp
) {
38 super(logger
, options
);
40 this.db
= _pgp(options
.db
.connectionString
);
41 this.schemaVersionsSupported
= schemaVersionsSupported
;
43 // Suppress QF warnings when running tests
44 this.noWarnings
= options
.db
.noWarnings
;
47 const queryLogLevel
= options
.db
.queryLogLevel
;
49 pgpInitOptions
.query
= (event
) => {
50 this.logger
[queryLogLevel
](_fileScope('pgp:query'), '', { ...common
.pick(event
, ['query', 'params']) });
55 pgpInitOptions
.error
= (err
, event
) => {
56 this.logger
.error(_fileScope('pgp:error'), '', { err
, event
});
59 // Deophidiate column names in-place, log results
60 pgpInitOptions
.receive
= (data
, result
, event
) => {
61 const exemplaryRow
= data
[0];
62 for (const prop
in exemplaryRow
) {
63 const camel
= Database
._camelfy(prop
);
64 if (!(camel
in exemplaryRow
)) {
65 for (const d
of data
) {
73 const resultLog
= common
.pick(result
, ['command', 'rowCount', 'duration']);
74 this.logger
[queryLogLevel
](_fileScope('pgp:result'), '', { query: event
.query
, ...resultLog
});
78 // Expose these for test coverage
79 this.pgpInitOptions
= pgpInitOptions
;
82 this._initStatements(_pgp
);
86 _queryFileHelper(_pgp
) {
88 const _scope
= _fileScope('_queryFile');
91 ...(this.noWarnings
&& { noWarnings: this.noWarnings
}),
93 const qf
= new _pgp
.QueryFile(file
, qfParams
);
95 this.logger
.error(_scope
, 'failed to create SQL statement', { error: qf
.error
, file
});
103 async
schemaCheck(applyMigrations
= true) {
104 const _scope
= _fileScope('schemaCheck');
105 this.logger
.debug(_scope
, 'called', { applyMigrations
});
106 if (applyMigrations
) {
107 await
this._initTables();
109 await
super.schemaCheck();
113 async
_initTables(_pgp
) {
114 const _scope
= _fileScope('_initTables');
115 this.logger
.debug(_scope
, 'called', {});
117 const _queryFile
= this._queryFileHelper(_pgp
|| this._pgp
);
119 // Migrations rely upon this table, ensure it exists.
120 const metaVersionTable
= '_meta_schema_version';
122 const tableExists
= async (name
) => this.db
.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name
});
123 let metaExists
= await
tableExists(metaVersionTable
);
125 const fPath
= path
.join(__dirname
, 'sql', 'schema', 'init.sql');
126 const initSql
= _queryFile(fPath
);
127 const results
= await
this.db
.multiResult(initSql
);
128 this.logger
.debug(_scope
, 'executed init sql', { results
});
129 metaExists
= await
tableExists(metaVersionTable
);
130 /* istanbul ignore if */
132 throw new DBErrors
.UnexpectedResult(`did not create ${metaVersionTable} table`);
134 this.logger
.info(_scope
, 'created schema version table', { metaVersionTable
});
138 const currentSchema
= await
this._currentSchema();
139 const migrationsWanted
= svh
.unappliedSchemaVersions(__dirname
, currentSchema
, this.schemaVersionsSupported
);
140 this.logger
.debug(_scope
, 'schema migrations wanted', { migrationsWanted
});
141 for (const v
of migrationsWanted
) {
142 const fPath
= path
.join(__dirname
, 'sql', 'schema', v
, 'apply.sql');
143 const migrationSql
= _queryFile(fPath
);
144 const results
= await
this.db
.multiResult(migrationSql
);
145 this.logger
.debug(_scope
, 'executed migration sql', { version: v
, results
});
146 this.logger
.info(_scope
, 'applied migration', { version: v
});
151 _initStatements(_pgp
) {
152 const _scope
= _fileScope('_initStatements');
153 const _queryFile
= this._queryFileHelper(_pgp
);
154 this.statement
= _pgp
.utils
.enumSql(path
.join(__dirname
, 'sql'), {}, _queryFile
);
155 this.logger
.debug(_scope
, 'statements initialized', { statements: Object
.keys(this.statement
).length
});
159 async
healthCheck() {
160 const _scope
= _fileScope('healthCheck');
161 this.logger
.debug(_scope
, 'called', {});
162 const c
= await
this.db
.connect();
164 return { serverVersion: c
.client
.serverVersion
};
168 async
_currentSchema() {
169 return this.db
.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
173 async
_closeConnection() {
174 const _scope
= _fileScope('_closeConnection');
176 await
this._pgp
.end();
178 this.logger
.error(_scope
, 'failed', { error: e
});
184 async
_purgeTables(really
= false) {
185 const _scope
= _fileScope('_purgeTables');
188 await
this.db
.tx(async (t
) => {
191 // 'topic_fetch_in_progress',
193 // 'verification_in_progress',
195 // 'subscription_delivery_in_progress',
196 ].map(async (table
) => t
.query('TRUNCATE TABLE $(table:name) CASCADE', { table
})));
200 this.logger
.error(_scope
, 'failed', { error: e
});
206 // eslint-disable-next-line class-methods-use-this
207 _engineInfo(result
) {
209 changes: result
.rowCount
,
210 lastInsertRowid: result
.rows
.length
? result
.rows
[0].id : undefined,
211 duration: result
.duration
,
216 // eslint-disable-next-line class-methods-use-this
218 return common
.pick(result
, ['command', 'rowCount', 'duration']);
223 return this.db
.task(async (t
) => fn(t
));
227 // eslint-disable-next-line class-methods-use-this
228 async
transaction(dbCtx
, fn
) {
229 return dbCtx
.txIf(async (t
) => fn(t
));
233 async
authenticationSuccess(dbCtx
, identifier
) {
234 const _scope
= _fileScope('authenticationSuccess');
235 this.logger
.debug(_scope
, 'called', { identifier
});
239 result
= await dbCtx
.result(this.statement
.authenticationSuccess
, { identifier
});
240 if (result
.rowCount
!= 1) {
241 throw new DBErrors
.UnexpectedResult('did not update authentication success event');
244 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
250 async
authenticationGet(dbCtx
, identifier
) {
251 const _scope
= _fileScope('authenticationGet');
252 this.logger
.debug(_scope
, 'called', { identifier
});
256 auth
= await dbCtx
.oneOrNone(this.statement
.authenticationGet
, { identifier
});
259 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
265 async
authenticationUpsert(dbCtx
, identifier
, credential
) {
266 const _scope
= _fileScope('authenticationUpsert');
267 const scrubbedCredential
= '*'.repeat((credential
|| '').length
);
268 this.logger
.debug(_scope
, 'called', { identifier
, scrubbedCredential
});
272 result
= await dbCtx
.result(this.statement
.authenticationUpsert
, { identifier
, credential
});
273 if (result
.rowCount
!= 1) {
274 throw new DBErrors
.UnexpectedResult('did not upsert authentication');
277 this.logger
.error(_scope
, 'failed', { error: e
, identifier
, scrubbedCredential
})
283 async
subscriptionsByTopicId(dbCtx
, topicId
) {
284 const _scope
= _fileScope('subscriptionsByTopicId');
285 this.logger
.debug(_scope
, 'called', { topicId
});
289 count
= await dbCtx
.manyOrNone(this.statement
.subscriptionsByTopicId
, { topicId
});
292 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
298 async
subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
299 const _scope
= _fileScope('subscriptionCountByTopicUrl');
300 this.logger
.debug(_scope
, 'called', { topicUrl
});
304 count
= await dbCtx
.one(this.statement
.subscriptionCountByTopicUrl
, { topicUrl
});
307 this.logger
.error(_scope
, 'failed', { error: e
, topicUrl
});
313 async
subscriptionDelete(dbCtx
, callback
, topicId
) {
314 const _scope
= _fileScope('subscriptionDelete');
315 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
318 const result
= await dbCtx
.result(this.statement
.subscriptionDelete
, { callback
, topicId
});
319 return this._engineInfo(result
);
321 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
327 async
subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
328 const _scope
= _fileScope('subscriptionDeliveryClaim');
329 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
, claimant
});
332 const claims
= await dbCtx
.txIf(async (txCtx
) => {
333 return txCtx
.manyOrNone(this.statement
.subscriptionDeliveryClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
335 return claims
.map((r
) => r
.id
);
337 this.logger
.error(_scope
, 'failed', { error: e
, claimant
, wanted
, claimTimeoutSeconds
});
343 async
subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
344 const _scope
= _fileScope('subscriptionDeliveryClaimById');
345 this.logger
.debug(_scope
, 'called', { subscriptionId
, claimTimeoutSeconds
, claimant
});
349 result
= await dbCtx
.txIf(async (txCtx
) => {
350 result
= await txCtx
.result(this.statement
.subscriptionDeliveryClaimById
, { claimant
, subscriptionId
, claimTimeoutSeconds
});
351 if (result
.rowCount
!= 1) {
352 throw new DBErrors
.UnexpectedResult('did not claim subscription delivery');
356 return this._engineInfo(result
);
358 this.logger
.error(_scope
, 'failed', { error: e
, claimant
, subscriptionId
, claimTimeoutSeconds
});
364 async
subscriptionDeliveryComplete(dbCtx
, callback
, topicId
) {
365 const _scope
= _fileScope('subscriptionDeliveryComplete');
366 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
370 await dbCtx
.txIf(async (txCtx
) => {
371 result
= await txCtx
.result(this.statement
.subscriptionDeliverySuccess
, { callback
, topicId
});
372 if (result
.rowCount
!= 1) {
373 throw new DBErrors
.UnexpectedResult('did not set subscription delivery success');
375 result
= await txCtx
.result(this.statement
.subscriptionDeliveryDone
, { callback
, topicId
});
376 if (result
.rowCount
!= 1) {
377 throw new DBErrors
.UnexpectedResult('did not release subscription delivery');
381 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
387 async
subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
388 const _scope
= _fileScope('subscriptionDeliveryGone');
389 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
393 await dbCtx
.txIf(async (txCtx
) => {
394 result
= await txCtx
.result(this.statement
.subscriptionDelete
, { callback
, topicId
});
395 if (result
.rowCount
!= 1) {
396 throw new DBErrors
.UnexpectedResult('did not delete subscription');
398 // Delete cascades to delivery
399 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
400 // if (result.rowCount != 1) {
401 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
405 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
411 async
subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
= [60]) {
412 const _scope
= _fileScope('subscriptionDeliveryIncomplete');
413 this.logger
.debug(_scope
, 'called', { callback
, topicId
, retryDelays
});
417 await dbCtx
.txIf(async (txCtx
) => {
418 const { currentAttempt
} = await txCtx
.one(this.statement
.subscriptionDeliveryAttempts
, { callback
, topicId
});
419 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
420 result
= await txCtx
.result(this.statement
.subscriptionDeliveryFailure
, { nextAttemptDelaySeconds
, callback
, topicId
});
421 if (result
.rowCount
!= 1) {
422 throw new DBErrors
.UnexpectedResult('did not set subscription delivery failure');
424 result
= await txCtx
.result(this.statement
.subscriptionDeliveryDone
, { callback
, topicId
});
425 if (result
.rowCount
!= 1) {
426 throw new DBErrors
.UnexpectedResult('did not release subscription delivery');
430 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
436 async
subscriptionGet(dbCtx
, callback
, topicId
) {
437 const _scope
= _fileScope('subscriptionGet');
438 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
442 subscription
= await dbCtx
.oneOrNone(this.statement
.subscriptionGet
, { callback
, topicId
});
445 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
451 async
subscriptionGetById(dbCtx
, subscriptionId
) {
452 const _scope
= _fileScope('subscriptionGetById');
453 this.logger
.debug(_scope
, 'called', { subscriptionId
});
457 subscription
= await dbCtx
.oneOrNone(this.statement
.subscriptionGetById
, { subscriptionId
});
460 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionId
});
466 async
subscriptionUpdate(dbCtx
, data
) {
467 const _scope
= _fileScope('subscriptionUpdate');
468 this.logger
.debug(_scope
, 'called', { data
});
470 const subscriptionData
= {
474 this._subscriptionUpdateDataValidate(subscriptionData
);
478 result
= await dbCtx
.result(this.statement
.subscriptionUpdate
, subscriptionData
);
479 if (result
.rowCount
!= 1) {
480 throw new DBErrors
.UnexpectedResult('did not update subscription');
483 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
489 async
subscriptionUpsert(dbCtx
, data
) {
490 const _scope
= _fileScope('subscriptionUpsert');
491 this.logger
.debug(_scope
, 'called', { ...data
});
493 const subscriptionData
= {
495 httpRemoteAddr: null,
499 this._subscriptionUpsertDataValidate(subscriptionData
);
503 result
= await dbCtx
.result(this.statement
.subscriptionUpsert
, subscriptionData
);
504 if (result
.rowCount
!= 1) {
505 throw new DBErrors
.UnexpectedResult('did not upsert subscription');
507 return this._engineInfo(result
);
509 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
515 async
topicDeleted(dbCtx
, topicId
) {
516 const _scope
= _fileScope('topicDeleted');
517 this.logger
.debug(_scope
, 'called', { topicId
});
521 result
= await dbCtx
.result(this.statement
.topicDeleted
, { topicId
});
522 if (result
.rowCount
!= 1) {
523 throw new DBErrors
.UnexpectedResult('did not update topic as deleted');
526 this.logger
.error(_scope
, 'failed to update topic as deleted', { error: e
, topicId
});
532 async
topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
533 const _scope
= _fileScope('topicFetchClaim');
534 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
538 await dbCtx
.txIf(async (txCtx
) => {
539 claims
= await txCtx
.manyOrNone(this.statement
.topicContentFetchClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
541 return claims
.map((r
) => r
.id
);
543 this.logger
.error(_scope
, 'failed to claim topics for fetch', { error: e
});
549 async
topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
550 const _scope
= _fileScope('topicFetchClaimById');
551 this.logger
.debug(_scope
, 'called', { topicId
, claimTimeoutSeconds
, claimant
});
555 await dbCtx
.txIf(async (txCtx
) => {
556 result
= await txCtx
.result(this.statement
.topicContentFetchClaimById
, { topicId
, claimant
, claimTimeoutSeconds
});
558 return this._engineInfo(result
);
560 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
566 async
topicFetchComplete(dbCtx
, topicId
) {
567 const _scope
= _fileScope('topicFetchComplete');
568 this.logger
.debug(_scope
, 'called', { topicId
});
572 await dbCtx
.txIf(async (txCtx
) => {
573 result
= await txCtx
.result(this.statement
.topicAttemptsReset
, { topicId
});
574 if (result
.rowCount
!= 1) {
575 throw new DBErrors
.UnexpectedResult('did not reset topic attempts');
577 result
= await txCtx
.result(this.statement
.topicContentFetchDone
, { topicId
});
578 if (result
.rowCount
!= 1) {
579 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
582 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
583 return this._engineInfo(result
);
585 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
591 async
topicFetchIncomplete(dbCtx
, topicId
, retryDelays
= [60]) {
592 const _scope
= _fileScope('topicFetchIncomplete');
593 this.logger
.debug(_scope
, 'called', { topicId
});
597 result
= await dbCtx
.txIf(async (txCtx
) => {
598 const { contentFetchAttemptsSinceSuccess: currentAttempt
} = await txCtx
.one(this.statement
.topicAttempts
, { topicId
});
599 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
600 result
= await txCtx
.result(this.statement
.topicAttemptsIncrement
, { topicId
, nextAttemptDelaySeconds
});
601 if (result
.rowCount
!= 1) {
602 throw new DBErrors
.UnexpectedResult('did not set topic attempts');
604 result
= await txCtx
.result(this.statement
.topicContentFetchDone
, { topicId
});
605 if (result
.rowCount
!= 1) {
606 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
610 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
611 return this._engineInfo(result
);
613 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
619 async
topicFetchRequested(dbCtx
, topicId
) {
620 const _scope
= _fileScope('topicFetchRequested');
621 this.logger
.debug(_scope
, 'called', { topicId
});
625 result
= await dbCtx
.result(this.statement
.topicContentFetchRequested
, { topicId
});
626 if (result
.rowCount
!= 1) {
627 throw new DBErrors
.UnexpectedResult('did not set topic fetch requested');
629 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
630 return this._engineInfo(result
);
632 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
638 async
topicGetAll(dbCtx
) {
639 const _scope
= _fileScope('topicGetAll');
640 this.logger
.debug(_scope
, 'called');
644 topics
= await dbCtx
.manyOrNone(this.statement
.topicGetInfoAll
);
646 this.logger
.error(_scope
, 'failed', { error: e
, topics
});
650 topics
= topics
.map(this._topicDefaults
.bind(this));
656 async
topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
657 const _scope
= _fileScope('topicGetById');
658 this.logger
.debug(_scope
, 'called', { topicId
});
662 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetById
, { topicId
});
664 topic
= this._topicDefaults(topic
);
668 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
674 async
topicGetByUrl(dbCtx
, topicUrl
) {
675 const _scope
= _fileScope('topicGetByUrl');
676 this.logger
.debug(_scope
, 'called', { topicUrl
});
680 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetByUrl
, { topicUrl
});
681 return this._topicDefaults(topic
);
683 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicUrl
});
689 async
topicGetContentById(dbCtx
, topicId
) {
690 const _scope
= _fileScope('topicGetContentById');
691 this.logger
.debug(_scope
, 'called', { topicId
});
695 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetContentById
, { topicId
});
696 return this._topicDefaults(topic
);
698 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
704 async
topicSet(dbCtx
, data
) {
705 const _scope
= _fileScope('topicSet');
706 this.logger
.debug(_scope
, 'called', data
);
708 const topicSetData
= {
709 publisherValidationUrl: null,
710 leaseSecondsPreferred: null,
711 leaseSecondsMin: null,
712 leaseSecondsMax: null,
718 this._topicSetDataValidate(topicSetData
);
719 result
= await dbCtx
.result(this.statement
.topicUpsert
, topicSetData
);
720 if (result
.rowCount
!= 1) {
721 throw new DBErrors
.UnexpectedResult('did not set topic data');
723 this.logger
.debug(_scope
, 'success', { topicSetData
, ...this._resultLog(result
) });
724 return this._engineInfo(result
);
726 this.logger
.error(_scope
, 'failed', { error: e
, result
});
732 async
topicSetContent(dbCtx
, data
) {
733 const _scope
= _fileScope('topicSetContent');
734 const topicSetContentData
= {
739 ...topicSetContentData
,
740 content: common
.logTruncate(topicSetContentData
.content
, 100),
742 this.logger
.debug(_scope
, 'called', data
);
746 this._topicSetContentDataValidate(topicSetContentData
);
747 result
= await dbCtx
.result(this.statement
.topicSetContent
, topicSetContentData
);
748 logData
.result
= this._resultLog(result
);
749 if (result
.rowCount
!= 1) {
750 throw new DBErrors
.UnexpectedResult('did not set topic content');
752 this.logger
.debug(_scope
, 'success', { ...logData
});
753 return this._engineInfo(result
);
755 this.logger
.error(_scope
, 'failed', { error: e
, ...logData
});
761 async
topicUpdate(dbCtx
, data
) {
762 const _scope
= _fileScope('topicUpdate');
763 this.logger
.debug(_scope
, 'called', { data
});
766 leaseSecondsPreferred: null,
767 leaseSecondsMin: null,
768 leaseSecondsMax: null,
769 publisherValidationUrl: null,
773 this._topicUpdateDataValidate(topicData
);
777 result
= await dbCtx
.result(this.statement
.topicUpdate
, topicData
);
778 if (result
.rowCount
!= 1) {
779 throw new DBErrors
.UnexpectedResult('did not update topic');
782 this.logger
.error(_scope
, 'failed', { error: e
, topicData
});
788 async
verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
789 const _scope
= _fileScope('verificationClaim');
790 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
794 await dbCtx
.txIf(async (txCtx
) => {
795 result
= await txCtx
.manyOrNone(this.statement
.verificationClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
797 return result
.map((r
) => r
.id
);
799 this.logger
.error(_scope
, 'failed', { wanted
, claimTimeoutSeconds
});
806 async
verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
807 const _scope
= _fileScope('verificationClaimById');
808 this.logger
.debug(_scope
, 'called', { verificationId
, claimant
, claimTimeoutSeconds
});
812 await dbCtx
.txIf(async (txCtx
) => {
813 result
= await txCtx
.result(this.statement
.verificationClaimById
, { verificationId
, claimant
, claimTimeoutSeconds
});
815 return this._engineInfo(result
);
817 this.logger
.error(_scope
, 'failed', { verificationId
, claimant
, claimTimeoutSeconds
});
823 async
verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
824 const _scope
= _fileScope('verificationComplete');
825 this.logger
.debug(_scope
, 'called', { verificationId
});
829 await dbCtx
.txIf(async (txCtx
) => {
830 result
= await txCtx
.result(this.statement
.verificationScrub
, { verificationId
, callback
, topicId
});
831 if (result
.rowCount
< 1) {
832 throw new DBErrors
.UnexpectedResult('did not remove verifications');
836 this.logger
.error(_scope
, 'failed', { verificationId
});
839 return this._engineInfo(result
);
843 async
verificationGetById(dbCtx
, verificationId
) {
844 const _scope
= _fileScope('verificationGetById');
845 this.logger
.debug(_scope
, 'called', { verificationId
});
849 verification
= await dbCtx
.oneOrNone(this.statement
.verificationGetById
, { verificationId
});
852 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
858 async
verificationIncomplete(dbCtx
, verificationId
, retryDelays
= [60]) {
859 const _scope
= _fileScope('verificationIncomplete');
860 this.logger
.debug(_scope
, 'called', { verificationId
});
864 await dbCtx
.txIf(async (txCtx
) => {
865 const { attempts
} = await txCtx
.one(this.statement
.verificationAttempts
, { verificationId
});
866 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(attempts
, retryDelays
);
867 result
= await txCtx
.result(this.statement
.verificationAttemptIncrement
, { verificationId
, nextAttemptDelaySeconds
});
868 if (result
.rowCount
!= 1) {
869 throw new DBErrors
.UnexpectedResult('did not update verification attempts');
871 result
= await txCtx
.result(this.statement
.verificationDone
, { verificationId
});
872 if (result
.rowCount
!= 1) {
873 throw new DBErrors
.UnexpectedResult('did not release verification');
877 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
883 async
verificationInsert(dbCtx
, verification
) {
884 const _scope
= _fileScope('verificationInsert');
885 this.logger
.debug(_scope
, 'called', { verification
});
887 const verificationData
= {
889 httpRemoteAddr: null,
895 let result
, verificationId
;
897 this._verificationDataValidate(verificationData
);
898 result
= await dbCtx
.result(this.statement
.verificationInsert
, verificationData
);
899 if (result
.rowCount
!= 1) {
900 throw new DBErrors
.UnexpectedResult('did not insert verification');
902 verificationId
= result
.rows
[0].id
;
903 this.logger
.debug(_scope
, 'inserted verification', { verificationId
});
905 return verificationId
;
907 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
913 async
verificationRelease(dbCtx
, verificationId
) {
914 const _scope
= _fileScope('verificationRelease');
915 this.logger
.debug(_scope
, 'called', { verificationId
});
919 result
= await dbCtx
.result(this.statement
.verificationDone
, { verificationId
});
920 if (result
.rowCount
!= 1) {
921 throw new DBErrors
.UnexpectedResult('did not release verification');
923 return this._engineInfo(result
);
925 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
931 async
verificationUpdate(dbCtx
, verificationId
, data
) {
932 const _scope
= _fileScope('verificationUpdate');
933 this.logger
.debug(_scope
, 'called', { verificationId
, data
});
935 const verificationData
= {
943 this._verificationUpdateDataValidate(verificationData
);
944 result
= await dbCtx
.result(this.statement
.verificationUpdate
, verificationData
);
945 if (result
.rowCount
!= 1) {
946 throw new DBErrors
.UnexpectedResult('did not update verification');
949 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
955 async
verificationValidated(dbCtx
, verificationId
) {
956 const _scope
= _fileScope('verificationValidated');
957 this.logger
.debug(_scope
, 'called', { verificationId
});
961 result
= await dbCtx
.result(this.statement
.verificationValidate
, { verificationId
});
962 if (result
.rowCount
!= 1) {
963 throw new DBErrors
.UnexpectedResult('did not set verification validation');
966 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
973 module
.exports
= DatabasePostgres
;