1 /* eslint-disable security/detect-object-injection */
4 const pgpInitOptions
= {
8 const path
= require('path');
9 const pgp
= require('pg-promise')(pgpInitOptions
);
10 const svh
= require('../schema-version-helper');
11 const Database
= require('../base');
12 const DBErrors
= require('../errors');
13 const Listener
= require('./listener');
14 const common
= require('../../common');
16 const _fileScope
= common
.fileScope(__filename
);
18 const PGTypeIdINT8
= 20; // Type Id 20 == INT8 (BIGINT)
19 const PGTYpeIdINT8Array
= 1016; //Type Id 1016 == INT8[] (BIGINT[])
20 pgp
.pg
.types
.setTypeParser(PGTypeIdINT8
, BigInt
); // Type Id 20 = INT8 (BIGINT)
21 const parseBigIntArray
= pgp
.pg
.types
.getTypeParser(PGTYpeIdINT8Array
); // Type Id 1016 = INT8[] (BIGINT[])
22 pgp
.pg
.types
.setTypeParser(PGTYpeIdINT8Array
, (a
) => parseBigIntArray(a
).map(BigInt
));
24 const schemaVersionsSupported
= {
37 class DatabasePostgres
extends Database
{
38 constructor(logger
, options
, _pgp
= pgp
) {
39 super(logger
, options
);
41 this.db
= _pgp(options
.db
.connectionString
);
42 this.schemaVersionsSupported
= schemaVersionsSupported
;
44 // Suppress QF warnings when running tests
45 this.noWarnings
= options
.db
.noWarnings
;
47 if (options
.db
.cacheEnabled
) {
48 this.listener
= new Listener(logger
, this.db
, Object
.assign({}, options
.db
.listener
, {
49 channel: 'topic_changed',
50 dataCallback: this._topicChanged
.bind(this),
51 connectionEstablishedCallback: this._listenerEstablished
.bind(this),
52 connectionLostCallback: this._listenerLost
.bind(this),
57 const queryLogLevel
= options
.db
.queryLogLevel
;
59 pgpInitOptions
.query
= (event
) => {
60 // Quell outgoing pings
61 if (event
&& event
.query
&& event
.query
.startsWith('NOTIFY')) {
64 this.logger
[queryLogLevel
](_fileScope('pgp:query'), '', { ...common
.pick(event
, ['query', 'params']) });
69 pgpInitOptions
.error
= (err
, event
) => {
70 this.logger
.error(_fileScope('pgp:error'), '', { err
, event
});
73 // Deophidiate column names in-place, log results
74 pgpInitOptions
.receive
= (data
, result
, event
) => {
75 const exemplaryRow
= data
[0];
76 for (const prop
in exemplaryRow
) {
77 const camel
= Database
._camelfy(prop
);
78 if (!(camel
in exemplaryRow
)) {
79 for (const d
of data
) {
86 // Quell outgoing pings
87 if (result
&& result
.command
=== 'NOTIFY') {
91 const resultLog
= common
.pick(result
, ['command', 'rowCount', 'duration']);
92 this.logger
[queryLogLevel
](_fileScope('pgp:result'), '', { query: event
.query
, ...resultLog
});
96 // Expose these for test coverage
97 this.pgpInitOptions
= pgpInitOptions
;
100 this._initStatements(_pgp
);
104 _queryFileHelper(_pgp
) {
106 const _scope
= _fileScope('_queryFile');
107 /* istanbul ignore next */
110 ...(this.noWarnings
&& { noWarnings: this.noWarnings
}),
112 const qf
= new _pgp
.QueryFile(file
, qfParams
);
114 this.logger
.error(_scope
, 'failed to create SQL statement', { error: qf
.error
, file
});
122 async
initialize(applyMigrations
= true) {
123 const _scope
= _fileScope('initialize');
124 this.logger
.debug(_scope
, 'called', { applyMigrations
});
125 if (applyMigrations
) {
126 await
this._initTables();
128 await
super.initialize();
130 await
this.listener
.start();
135 async
_initTables(_pgp
) {
136 const _scope
= _fileScope('_initTables');
137 this.logger
.debug(_scope
, 'called', {});
139 const _queryFile
= this._queryFileHelper(_pgp
|| this._pgp
);
141 // Migrations rely upon this table, ensure it exists.
142 const metaVersionTable
= '_meta_schema_version';
144 const tableExists
= async (name
) => this.db
.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name
});
145 let metaExists
= await
tableExists(metaVersionTable
);
147 const fPath
= path
.join(__dirname
, 'sql', 'schema', 'init.sql');
148 const initSql
= _queryFile(fPath
);
149 const results
= await
this.db
.multiResult(initSql
);
150 this.logger
.debug(_scope
, 'executed init sql', { results
});
151 metaExists
= await
tableExists(metaVersionTable
);
152 /* istanbul ignore if */
154 throw new DBErrors
.UnexpectedResult(`did not create ${metaVersionTable} table`);
156 this.logger
.info(_scope
, 'created schema version table', { metaVersionTable
});
160 const currentSchema
= await
this._currentSchema();
161 const migrationsWanted
= svh
.unappliedSchemaVersions(__dirname
, currentSchema
, this.schemaVersionsSupported
);
162 this.logger
.debug(_scope
, 'schema migrations wanted', { migrationsWanted
});
163 for (const v
of migrationsWanted
) {
164 const fPath
= path
.join(__dirname
, 'sql', 'schema', v
, 'apply.sql');
166 const migrationSql
= _queryFile(fPath
);
167 this.logger
.debug(_scope
, 'applying migration', { version: v
});
168 const results
= await
this.db
.multiResult(migrationSql
);
169 this.logger
.debug(_scope
, 'migration results', { results
});
170 this.logger
.info(_scope
, 'applied migration', { version: v
});
172 this.logger
.error(_scope
, 'migration failed', { error: e
, fPath
, version: v
});
179 _initStatements(_pgp
) {
180 const _scope
= _fileScope('_initStatements');
181 const _queryFile
= this._queryFileHelper(_pgp
);
182 this.statement
= _pgp
.utils
.enumSql(path
.join(__dirname
, 'sql'), {}, _queryFile
);
183 this.logger
.debug(_scope
, 'statements initialized', { statements: Object
.keys(this.statement
).length
});
187 async
healthCheck() {
188 const _scope
= _fileScope('healthCheck');
189 this.logger
.debug(_scope
, 'called', {});
190 const c
= await
this.db
.connect();
192 return { serverVersion: c
.client
.serverVersion
};
196 async
_currentSchema() {
197 return this.db
.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
201 async
_closeConnection() {
202 const _scope
= _fileScope('_closeConnection');
205 await
this.listener
.stop();
207 await
this._pgp
.end();
209 this.logger
.error(_scope
, 'failed', { error: e
});
215 /* istanbul ignore next */
216 async
_purgeTables(really
= false) {
217 const _scope
= _fileScope('_purgeTables');
220 await
this.db
.tx(async (t
) => {
223 // 'topic_fetch_in_progress',
225 // 'verification_in_progress',
227 // 'subscription_delivery_in_progress',
228 ].map(async (table
) => t
.query('TRUNCATE TABLE $(table:name) CASCADE', { table
})));
232 this.logger
.error(_scope
, 'failed', { error: e
});
238 // eslint-disable-next-line class-methods-use-this
239 _engineInfo(result
) {
241 changes: result
.rowCount
,
242 lastInsertRowid: result
.rows
.length
? result
.rows
[0].id : undefined,
243 duration: result
.duration
,
248 // eslint-disable-next-line class-methods-use-this
250 return common
.pick(result
, ['command', 'rowCount', 'duration']);
255 * Receive notices when topic entry is updated.
256 * Clear relevant cache entry.
257 * @param {String} payload
259 _topicChanged(payload
) {
260 const _scope
= _fileScope('_topicChanged');
261 if (payload
!== 'ping') {
262 this.logger
.debug(_scope
, 'called', { payload
});
263 this.cache
.delete(payload
);
269 * Called when a listener connection is opened.
272 _listenerEstablished() {
273 const _scope
= _fileScope('_listenerEstablished');
274 this.logger
.debug(_scope
, 'called', {});
275 this.cache
= new Map();
280 * Called when a listener connection is closed.
284 const _scope
= _fileScope('_listenerLost');
285 this.logger
.debug(_scope
, 'called', {});
291 * Return a cached entry, if available.
295 const _scope
= _fileScope('_cacheGet');
296 if (this.cache
&& this.cache
.has(key
)) {
297 const cacheEntry
= this.cache
.get(key
);
298 this.logger
.debug(_scope
, 'found cache entry', { key
, ...common
.pick(cacheEntry
, ['added', 'hits', 'lastHit']) });
299 cacheEntry
.hits
+= 1;
300 cacheEntry
.lastHit
= new Date();
301 return cacheEntry
.data
;
307 * Store an entry in cache, if available.
311 _cacheSet(key
, data
) {
312 const _scope
= _fileScope('_cacheSet');
314 this.cache
.set(key
, {
320 this.logger
.debug(_scope
, 'added cache entry', { key
});
326 return this.db
.task(async (t
) => fn(t
));
330 // eslint-disable-next-line class-methods-use-this
331 async
transaction(dbCtx
, fn
) {
332 return dbCtx
.txIf(async (t
) => fn(t
));
336 async
authenticationSuccess(dbCtx
, identifier
) {
337 const _scope
= _fileScope('authenticationSuccess');
338 this.logger
.debug(_scope
, 'called', { identifier
});
342 result
= await dbCtx
.result(this.statement
.authenticationSuccess
, { identifier
});
343 if (result
.rowCount
!= 1) {
344 throw new DBErrors
.UnexpectedResult('did not update authentication success event');
347 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
353 async
authenticationGet(dbCtx
, identifier
) {
354 const _scope
= _fileScope('authenticationGet');
355 this.logger
.debug(_scope
, 'called', { identifier
});
359 auth
= await dbCtx
.oneOrNone(this.statement
.authenticationGet
, { identifier
});
362 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
368 async
authenticationUpsert(dbCtx
, identifier
, credential
) {
369 const _scope
= _fileScope('authenticationUpsert');
370 const scrubbedCredential
= '*'.repeat((credential
|| '').length
);
371 this.logger
.debug(_scope
, 'called', { identifier
, scrubbedCredential
});
375 result
= await dbCtx
.result(this.statement
.authenticationUpsert
, { identifier
, credential
});
376 if (result
.rowCount
!= 1) {
377 throw new DBErrors
.UnexpectedResult('did not upsert authentication');
380 this.logger
.error(_scope
, 'failed', { error: e
, identifier
, scrubbedCredential
});
386 async
subscriptionsByTopicId(dbCtx
, topicId
) {
387 const _scope
= _fileScope('subscriptionsByTopicId');
388 this.logger
.debug(_scope
, 'called', { topicId
});
392 count
= await dbCtx
.manyOrNone(this.statement
.subscriptionsByTopicId
, { topicId
});
395 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
401 async
subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
402 const _scope
= _fileScope('subscriptionCountByTopicUrl');
403 this.logger
.debug(_scope
, 'called', { topicUrl
});
407 count
= await dbCtx
.one(this.statement
.subscriptionCountByTopicUrl
, { topicUrl
});
410 this.logger
.error(_scope
, 'failed', { error: e
, topicUrl
});
416 async
subscriptionDelete(dbCtx
, callback
, topicId
) {
417 const _scope
= _fileScope('subscriptionDelete');
418 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
421 const result
= await dbCtx
.result(this.statement
.subscriptionDelete
, { callback
, topicId
});
422 return this._engineInfo(result
);
424 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
430 async
subscriptionDeleteExpired(dbCtx
, topicId
) {
431 const _scope
= _fileScope('subscriptionDeleteExpired');
432 this.logger
.debug(_scope
, 'called', { topicId
});
435 const result
= await dbCtx
.result(this.statement
.subscriptionDeleteExpired
, { topicId
});
436 this.logger
.debug(_scope
, 'success', { topicId
, deleted: result
.rowCount
});
437 return this._engineInfo(result
);
439 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
445 async
subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
446 const _scope
= _fileScope('subscriptionDeliveryClaim');
447 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
, claimant
});
450 const claims
= await dbCtx
.txIf(async (txCtx
) => {
451 return txCtx
.manyOrNone(this.statement
.subscriptionDeliveryClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
453 return claims
.map((r
) => r
.id
);
455 this.logger
.error(_scope
, 'failed', { error: e
, claimant
, wanted
, claimTimeoutSeconds
});
461 async
subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
462 const _scope
= _fileScope('subscriptionDeliveryClaimById');
463 this.logger
.debug(_scope
, 'called', { subscriptionId
, claimTimeoutSeconds
, claimant
});
467 result
= await dbCtx
.txIf(async (txCtx
) => {
468 result
= await txCtx
.result(this.statement
.subscriptionDeliveryClaimById
, { claimant
, subscriptionId
, claimTimeoutSeconds
});
469 if (result
.rowCount
!= 1) {
470 throw new DBErrors
.UnexpectedResult('did not claim subscription delivery');
474 return this._engineInfo(result
);
476 this.logger
.error(_scope
, 'failed', { error: e
, claimant
, subscriptionId
, claimTimeoutSeconds
});
482 async
subscriptionDeliveryComplete(dbCtx
, callback
, topicId
, topicContentUpdated
) {
483 const _scope
= _fileScope('subscriptionDeliveryComplete');
484 this.logger
.debug(_scope
, 'called', { callback
, topicId
, topicContentUpdated
});
488 await dbCtx
.txIf(async (txCtx
) => {
489 result
= await txCtx
.result(this.statement
.subscriptionDeliverySuccess
, { callback
, topicId
, topicContentUpdated
});
490 if (result
.rowCount
!= 1) {
491 throw new DBErrors
.UnexpectedResult('did not set subscription delivery success');
493 result
= await txCtx
.result(this.statement
.subscriptionDeliveryDone
, { callback
, topicId
});
494 if (result
.rowCount
!= 1) {
495 throw new DBErrors
.UnexpectedResult('did not release subscription delivery');
499 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
, topicContentUpdated
});
505 async
subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
506 const _scope
= _fileScope('subscriptionDeliveryGone');
507 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
511 await dbCtx
.txIf(async (txCtx
) => {
512 result
= await txCtx
.result(this.statement
.subscriptionDelete
, { callback
, topicId
});
513 if (result
.rowCount
!= 1) {
514 throw new DBErrors
.UnexpectedResult('did not delete subscription');
516 // Delete cascades to delivery
517 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
518 // if (result.rowCount != 1) {
519 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
523 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
529 async
subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
= [60]) {
530 const _scope
= _fileScope('subscriptionDeliveryIncomplete');
531 this.logger
.debug(_scope
, 'called', { callback
, topicId
, retryDelays
});
535 await dbCtx
.txIf(async (txCtx
) => {
536 const { currentAttempt
} = await txCtx
.one(this.statement
.subscriptionDeliveryAttempts
, { callback
, topicId
});
537 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
538 result
= await txCtx
.result(this.statement
.subscriptionDeliveryFailure
, { nextAttemptDelaySeconds
, callback
, topicId
});
539 if (result
.rowCount
!= 1) {
540 throw new DBErrors
.UnexpectedResult('did not set subscription delivery failure');
542 result
= await txCtx
.result(this.statement
.subscriptionDeliveryDone
, { callback
, topicId
});
543 if (result
.rowCount
!= 1) {
544 throw new DBErrors
.UnexpectedResult('did not release subscription delivery');
548 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
554 async
subscriptionGet(dbCtx
, callback
, topicId
) {
555 const _scope
= _fileScope('subscriptionGet');
556 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
560 subscription
= await dbCtx
.oneOrNone(this.statement
.subscriptionGet
, { callback
, topicId
});
563 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
569 async
subscriptionGetById(dbCtx
, subscriptionId
) {
570 const _scope
= _fileScope('subscriptionGetById');
571 this.logger
.debug(_scope
, 'called', { subscriptionId
});
575 subscription
= await dbCtx
.oneOrNone(this.statement
.subscriptionGetById
, { subscriptionId
});
578 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionId
});
584 async
subscriptionUpdate(dbCtx
, data
) {
585 const _scope
= _fileScope('subscriptionUpdate');
586 this.logger
.debug(_scope
, 'called', { data
});
588 const subscriptionData
= {
592 this._subscriptionUpdateDataValidate(subscriptionData
);
596 result
= await dbCtx
.result(this.statement
.subscriptionUpdate
, subscriptionData
);
597 if (result
.rowCount
!= 1) {
598 throw new DBErrors
.UnexpectedResult('did not update subscription');
601 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
607 async
subscriptionUpsert(dbCtx
, data
) {
608 const _scope
= _fileScope('subscriptionUpsert');
609 this.logger
.debug(_scope
, 'called', { ...data
});
611 const subscriptionData
= {
613 httpRemoteAddr: null,
617 this._subscriptionUpsertDataValidate(subscriptionData
);
621 result
= await dbCtx
.result(this.statement
.subscriptionUpsert
, subscriptionData
);
622 if (result
.rowCount
!= 1) {
623 throw new DBErrors
.UnexpectedResult('did not upsert subscription');
625 return this._engineInfo(result
);
627 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
633 async
topicDeleted(dbCtx
, topicId
) {
634 const _scope
= _fileScope('topicDeleted');
635 this.logger
.debug(_scope
, 'called', { topicId
});
639 result
= await dbCtx
.result(this.statement
.topicDeleted
, { topicId
});
640 if (result
.rowCount
!= 1) {
641 throw new DBErrors
.UnexpectedResult('did not update topic as deleted');
644 this.logger
.error(_scope
, 'failed to update topic as deleted', { error: e
, topicId
});
650 async
topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
651 const _scope
= _fileScope('topicFetchClaim');
652 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
656 await dbCtx
.txIf(async (txCtx
) => {
657 claims
= await txCtx
.manyOrNone(this.statement
.topicContentFetchClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
659 return claims
.map((r
) => r
.id
);
661 this.logger
.error(_scope
, 'failed to claim topics for fetch', { error: e
});
667 async
topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
668 const _scope
= _fileScope('topicFetchClaimById');
669 this.logger
.debug(_scope
, 'called', { topicId
, claimTimeoutSeconds
, claimant
});
673 await dbCtx
.txIf(async (txCtx
) => {
674 result
= await txCtx
.result(this.statement
.topicContentFetchClaimById
, { topicId
, claimant
, claimTimeoutSeconds
});
676 return this._engineInfo(result
);
678 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
684 async
topicFetchComplete(dbCtx
, topicId
) {
685 const _scope
= _fileScope('topicFetchComplete');
686 this.logger
.debug(_scope
, 'called', { topicId
});
690 await dbCtx
.txIf(async (txCtx
) => {
691 result
= await txCtx
.result(this.statement
.topicAttemptsReset
, { topicId
});
692 if (result
.rowCount
!= 1) {
693 throw new DBErrors
.UnexpectedResult('did not reset topic attempts');
695 result
= await txCtx
.result(this.statement
.topicContentFetchDone
, { topicId
});
696 if (result
.rowCount
!= 1) {
697 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
700 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
701 return this._engineInfo(result
);
703 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
709 async
topicFetchIncomplete(dbCtx
, topicId
, retryDelays
= [60]) {
710 const _scope
= _fileScope('topicFetchIncomplete');
711 this.logger
.debug(_scope
, 'called', { topicId
});
715 result
= await dbCtx
.txIf(async (txCtx
) => {
716 const { contentFetchAttemptsSinceSuccess: currentAttempt
} = await txCtx
.one(this.statement
.topicAttempts
, { topicId
});
717 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
718 result
= await txCtx
.result(this.statement
.topicAttemptsIncrement
, { topicId
, nextAttemptDelaySeconds
});
719 if (result
.rowCount
!= 1) {
720 throw new DBErrors
.UnexpectedResult('did not set topic attempts');
722 result
= await txCtx
.result(this.statement
.topicContentFetchDone
, { topicId
});
723 if (result
.rowCount
!= 1) {
724 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
728 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
729 return this._engineInfo(result
);
731 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
737 async
topicFetchRequested(dbCtx
, topicId
) {
738 const _scope
= _fileScope('topicFetchRequested');
739 this.logger
.debug(_scope
, 'called', { topicId
});
743 result
= await dbCtx
.result(this.statement
.topicContentFetchRequested
, { topicId
});
744 if (result
.rowCount
!= 1) {
745 throw new DBErrors
.UnexpectedResult('did not set topic fetch requested');
747 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
748 return this._engineInfo(result
);
750 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
756 async
topicGetAll(dbCtx
) {
757 const _scope
= _fileScope('topicGetAll');
758 this.logger
.debug(_scope
, 'called');
762 topics
= await dbCtx
.manyOrNone(this.statement
.topicGetInfoAll
);
764 this.logger
.error(_scope
, 'failed', { error: e
, topics
});
768 topics
= topics
.map(this._topicDefaults
.bind(this));
774 async
topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
775 const _scope
= _fileScope('topicGetById');
776 this.logger
.debug(_scope
, 'called', { topicId
});
780 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetById
, { topicId
});
782 topic
= this._topicDefaults(topic
);
786 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
792 async
topicGetByUrl(dbCtx
, topicUrl
, applyDefaults
= true) {
793 const _scope
= _fileScope('topicGetByUrl');
794 this.logger
.debug(_scope
, 'called', { topicUrl
});
798 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetByUrl
, { topicUrl
});
800 topic
= this._topicDefaults(topic
);
804 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicUrl
});
810 async
topicGetContentById(dbCtx
, topicId
) {
811 const _scope
= _fileScope('topicGetContentById');
812 this.logger
.debug(_scope
, 'called', { topicId
});
816 topic
= this._cacheGet(topicId
);
820 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetContentById
, { topicId
});
821 const topicWithDefaults
= this._topicDefaults(topic
);
822 this._cacheSet(topicId
, topicWithDefaults
);
823 return topicWithDefaults
;
825 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
831 async
topicPendingDelete(dbCtx
, topicId
) {
832 const _scope
= _fileScope('topicPendingDelete');
833 this.logger
.debug(_scope
, 'called', { topicId
});
836 await dbCtx
.txIf(async (txCtx
) => {
837 const topic
= await txCtx
.one(this.statement
.topicGetById
, { topicId
});
838 if (!topic
.isDeleted
) {
839 this.logger
.debug(_scope
, 'topic not set deleted, not deleting', { topicId
});
843 const { count: subscriberCount
} = await txCtx
.one(this.statement
.subscriptionCountByTopicUrl
, { topicUrl: topic
.url
});
844 if (subscriberCount
) {
845 this.logger
.debug(_scope
, 'topic has subscribers, not deleting', { topicId
, subscriberCount
});
849 const result
= await txCtx
.result(this.statement
.topicDeleteById
, { topicId
});
850 if (result
.rowCount
!== 1) {
851 throw new DBErrors
.UnexpectedResult('did not delete topic');
854 this.logger
.debug(_scope
, 'success', { topicId
});
856 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
862 async
topicPublishHistory(dbCtx
, topicId
, days
) {
863 const _scope
= _fileScope('topicPublishHistory');
864 this.logger
.debug(_scope
, 'called', { topicId
, days
});
866 const events
= await dbCtx
.manyOrNone(this.statement
.topicPublishHistory
, { topicIds: [topicId
], daysAgo: days
});
867 const history
= Array
.from({ length: days
}, () => 0);
868 events
.forEach(({ daysAgo
, contentUpdates
}) => history
[daysAgo
] = Number(contentUpdates
));
874 async
topicSet(dbCtx
, data
) {
875 const _scope
= _fileScope('topicSet');
876 this.logger
.debug(_scope
, 'called', data
);
878 const topicSetData
= {
879 publisherValidationUrl: null,
880 leaseSecondsPreferred: null,
881 leaseSecondsMin: null,
882 leaseSecondsMax: null,
888 this._topicSetDataValidate(topicSetData
);
889 result
= await dbCtx
.result(this.statement
.topicUpsert
, topicSetData
);
890 if (result
.rowCount
!= 1) {
891 throw new DBErrors
.UnexpectedResult('did not set topic data');
893 this.logger
.debug(_scope
, 'success', { topicSetData
, ...this._resultLog(result
) });
894 return this._engineInfo(result
);
896 this.logger
.error(_scope
, 'failed', { error: e
, result
});
902 async
topicSetContent(dbCtx
, data
) {
903 const _scope
= _fileScope('topicSetContent');
904 const topicSetContentData
= {
907 httpLastModified: null,
911 ...topicSetContentData
,
912 content: common
.logTruncate(topicSetContentData
.content
, 100),
914 this.logger
.debug(_scope
, 'called', data
);
918 this._topicSetContentDataValidate(topicSetContentData
);
919 result
= await dbCtx
.result(this.statement
.topicSetContent
, topicSetContentData
);
920 logData
.result
= this._resultLog(result
);
921 if (result
.rowCount
!= 1) {
922 throw new DBErrors
.UnexpectedResult('did not set topic content');
924 result
= await dbCtx
.result(this.statement
.topicSetContentHistory
, {
925 topicId: data
.topicId
,
926 contentHash: data
.contentHash
,
927 contentSize: data
.content
.length
,
929 if (result
.rowCount
!= 1) {
930 throw new DBErrors
.UnexpectedResult('did not set topic content history');
932 this.logger
.debug(_scope
, 'success', { ...logData
});
933 return this._engineInfo(result
);
935 this.logger
.error(_scope
, 'failed', { error: e
, ...logData
});
941 async
topicUpdate(dbCtx
, data
) {
942 const _scope
= _fileScope('topicUpdate');
943 this.logger
.debug(_scope
, 'called', { data
});
946 leaseSecondsPreferred: null,
947 leaseSecondsMin: null,
948 leaseSecondsMax: null,
949 publisherValidationUrl: null,
953 this._topicUpdateDataValidate(topicData
);
957 result
= await dbCtx
.result(this.statement
.topicUpdate
, topicData
);
958 if (result
.rowCount
!= 1) {
959 throw new DBErrors
.UnexpectedResult('did not update topic');
962 this.logger
.error(_scope
, 'failed', { error: e
, topicData
});
968 async
verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
969 const _scope
= _fileScope('verificationClaim');
970 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
974 await dbCtx
.txIf(async (txCtx
) => {
975 result
= await txCtx
.manyOrNone(this.statement
.verificationClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
977 return result
.map((r
) => r
.id
);
979 this.logger
.error(_scope
, 'failed', { wanted
, claimTimeoutSeconds
});
986 async
verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
987 const _scope
= _fileScope('verificationClaimById');
988 this.logger
.debug(_scope
, 'called', { verificationId
, claimant
, claimTimeoutSeconds
});
992 await dbCtx
.txIf(async (txCtx
) => {
993 result
= await txCtx
.result(this.statement
.verificationClaimById
, { verificationId
, claimant
, claimTimeoutSeconds
});
995 return this._engineInfo(result
);
997 this.logger
.error(_scope
, 'failed', { verificationId
, claimant
, claimTimeoutSeconds
});
1003 async
verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
1004 const _scope
= _fileScope('verificationComplete');
1005 this.logger
.debug(_scope
, 'called', { verificationId
});
1009 await dbCtx
.txIf(async (txCtx
) => {
1010 result
= await txCtx
.result(this.statement
.verificationScrub
, { verificationId
, callback
, topicId
});
1011 if (result
.rowCount
< 1) {
1012 throw new DBErrors
.UnexpectedResult('did not remove verifications');
1016 this.logger
.error(_scope
, 'failed', { verificationId
});
1019 return this._engineInfo(result
);
1023 async
verificationGetById(dbCtx
, verificationId
) {
1024 const _scope
= _fileScope('verificationGetById');
1025 this.logger
.debug(_scope
, 'called', { verificationId
});
1029 verification
= await dbCtx
.oneOrNone(this.statement
.verificationGetById
, { verificationId
});
1030 return verification
;
1032 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1038 async
verificationIncomplete(dbCtx
, verificationId
, retryDelays
= [60]) {
1039 const _scope
= _fileScope('verificationIncomplete');
1040 this.logger
.debug(_scope
, 'called', { verificationId
});
1044 await dbCtx
.txIf(async (txCtx
) => {
1045 const { attempts
} = await txCtx
.one(this.statement
.verificationAttempts
, { verificationId
});
1046 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(attempts
, retryDelays
);
1047 result
= await txCtx
.result(this.statement
.verificationAttemptIncrement
, { verificationId
, nextAttemptDelaySeconds
});
1048 if (result
.rowCount
!= 1) {
1049 throw new DBErrors
.UnexpectedResult('did not update verification attempts');
1051 result
= await txCtx
.result(this.statement
.verificationDone
, { verificationId
});
1052 if (result
.rowCount
!= 1) {
1053 throw new DBErrors
.UnexpectedResult('did not release verification');
1057 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1063 async
verificationInsert(dbCtx
, verification
) {
1064 const _scope
= _fileScope('verificationInsert');
1065 this.logger
.debug(_scope
, 'called', { verification
});
1067 const verificationData
= {
1069 httpRemoteAddr: null,
1075 let result
, verificationId
;
1077 this._verificationDataValidate(verificationData
);
1078 result
= await dbCtx
.result(this.statement
.verificationInsert
, verificationData
);
1079 if (result
.rowCount
!= 1) {
1080 throw new DBErrors
.UnexpectedResult('did not insert verification');
1082 verificationId
= result
.rows
[0].id
;
1083 this.logger
.debug(_scope
, 'inserted verification', { verificationId
});
1085 return verificationId
;
1087 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
1093 async
verificationRelease(dbCtx
, verificationId
) {
1094 const _scope
= _fileScope('verificationRelease');
1095 this.logger
.debug(_scope
, 'called', { verificationId
});
1099 result
= await dbCtx
.result(this.statement
.verificationDone
, { verificationId
});
1100 if (result
.rowCount
!= 1) {
1101 throw new DBErrors
.UnexpectedResult('did not release verification');
1103 return this._engineInfo(result
);
1105 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1111 async
verificationUpdate(dbCtx
, verificationId
, data
) {
1112 const _scope
= _fileScope('verificationUpdate');
1113 this.logger
.debug(_scope
, 'called', { verificationId
, data
});
1115 const verificationData
= {
1123 this._verificationUpdateDataValidate(verificationData
);
1124 result
= await dbCtx
.result(this.statement
.verificationUpdate
, verificationData
);
1125 if (result
.rowCount
!= 1) {
1126 throw new DBErrors
.UnexpectedResult('did not update verification');
1129 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
1135 async
verificationValidated(dbCtx
, verificationId
) {
1136 const _scope
= _fileScope('verificationValidated');
1137 this.logger
.debug(_scope
, 'called', { verificationId
});
1141 result
= await dbCtx
.result(this.statement
.verificationValidate
, { verificationId
});
1142 if (result
.rowCount
!= 1) {
1143 throw new DBErrors
.UnexpectedResult('did not set verification validation');
1146 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1153 module
.exports
= DatabasePostgres
;