1 /* eslint-disable security/detect-object-injection */
4 const pgpInitOptions
= {
8 const path
= require('path');
9 const pgp
= require('pg-promise')(pgpInitOptions
);
10 const svh
= require('../schema-version-helper');
11 const Database
= require('../base');
12 const DBErrors
= require('../errors');
13 const Listener
= require('./listener');
14 const common
= require('../../common');
16 const _fileScope
= common
.fileScope(__filename
);
18 const PGTypeIdINT8
= 20; // Type Id 20 == INT8 (BIGINT)
19 const PGTYpeIdINT8Array
= 1016; //Type Id 1016 == INT8[] (BIGINT[])
20 pgp
.pg
.types
.setTypeParser(PGTypeIdINT8
, BigInt
); // Type Id 20 = INT8 (BIGINT)
21 const parseBigIntArray
= pgp
.pg
.types
.getTypeParser(PGTYpeIdINT8Array
); // Type Id 1016 = INT8[] (BIGINT[])
22 pgp
.pg
.types
.setTypeParser(PGTYpeIdINT8Array
, (a
) => parseBigIntArray(a
).map(BigInt
));
24 const schemaVersionsSupported
= {
37 class DatabasePostgres
extends Database
{
38 constructor(logger
, options
, _pgp
= pgp
) {
39 super(logger
, options
);
41 this.db
= _pgp(options
.db
.connectionString
);
42 this.schemaVersionsSupported
= schemaVersionsSupported
;
44 // Suppress QF warnings when running tests
45 this.noWarnings
= options
.db
.noWarnings
;
47 if (options
.db
.cacheEnabled
) {
48 this.listener
= new Listener(logger
, this.db
, Object
.assign({}, options
.db
.listener
, {
49 channel: 'topic_changed',
50 dataCallback: this._topicChanged
.bind(this),
51 connectionEstablishedCallback: this._listenerEstablished
.bind(this),
52 connectionLostCallback: this._listenerLost
.bind(this),
57 const queryLogLevel
= options
.db
.queryLogLevel
;
59 pgpInitOptions
.query
= (event
) => {
60 // Quell outgoing pings
61 if (event
&& event
.query
&& event
.query
.startsWith('NOTIFY')) {
64 this.logger
[queryLogLevel
](_fileScope('pgp:query'), '', { ...common
.pick(event
|| {}, ['query', 'params']) });
69 pgpInitOptions
.error
= (err
, event
) => {
70 this.logger
.error(_fileScope('pgp:error'), '', { err
, event
});
72 // TODO: close connection on err.code === '57P03' database shutting down
75 // Deophidiate column names in-place, log results
76 pgpInitOptions
.receive
= ({ data
, result
, ctx: event
}) => {
77 const exemplaryRow
= data
[0];
78 for (const prop
in exemplaryRow
) {
79 const camel
= Database
._camelfy(prop
);
80 if (!(camel
in exemplaryRow
)) {
81 for (const d
of data
) {
88 // Quell outgoing pings
89 if (result
&& result
.command
=== 'NOTIFY') {
93 const resultLog
= common
.pick(result
|| {}, ['command', 'rowCount', 'duration']);
94 this.logger
[queryLogLevel
](_fileScope('pgp:result'), '', { query: event
.query
, ...resultLog
});
98 // Expose these for test coverage
99 this.pgpInitOptions
= pgpInitOptions
;
102 this._initStatements(_pgp
);
106 _queryFileHelper(_pgp
) {
108 const _scope
= _fileScope('_queryFile');
109 /* istanbul ignore next */
112 ...(this.noWarnings
&& { noWarnings: this.noWarnings
}),
114 const qf
= new _pgp
.QueryFile(file
, qfParams
);
116 this.logger
.error(_scope
, 'failed to create SQL statement', { error: qf
.error
, file
});
124 async
initialize(applyMigrations
= true) {
125 const _scope
= _fileScope('initialize');
126 this.logger
.debug(_scope
, 'called', { applyMigrations
});
127 if (applyMigrations
) {
128 await
this._initTables();
130 await
super.initialize();
132 await
this.listener
.start();
137 async
_initTables(_pgp
) {
138 const _scope
= _fileScope('_initTables');
139 this.logger
.debug(_scope
, 'called', {});
141 const _queryFile
= this._queryFileHelper(_pgp
|| this._pgp
);
143 // Migrations rely upon this table, ensure it exists.
144 const metaVersionTable
= '_meta_schema_version';
146 const tableExists
= async (name
) => this.db
.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name
});
147 let metaExists
= await
tableExists(metaVersionTable
);
149 const fPath
= path
.join(__dirname
, 'sql', 'schema', 'init.sql');
150 const initSql
= _queryFile(fPath
);
151 const results
= await
this.db
.multiResult(initSql
);
152 this.logger
.debug(_scope
, 'executed init sql', { results
});
153 metaExists
= await
tableExists(metaVersionTable
);
154 /* istanbul ignore if */
156 throw new DBErrors
.UnexpectedResult(`did not create ${metaVersionTable} table`);
158 this.logger
.info(_scope
, 'created schema version table', { metaVersionTable
});
162 const currentSchema
= await
this._currentSchema();
163 const migrationsWanted
= svh
.unappliedSchemaVersions(__dirname
, currentSchema
, this.schemaVersionsSupported
);
164 this.logger
.debug(_scope
, 'schema migrations wanted', { migrationsWanted
});
165 for (const v
of migrationsWanted
) {
166 const fPath
= path
.join(__dirname
, 'sql', 'schema', v
, 'apply.sql');
168 const migrationSql
= _queryFile(fPath
);
169 this.logger
.debug(_scope
, 'applying migration', { version: v
});
170 const results
= await
this.db
.multiResult(migrationSql
);
171 this.logger
.debug(_scope
, 'migration results', { results
});
172 this.logger
.info(_scope
, 'applied migration', { version: v
});
174 this.logger
.error(_scope
, 'migration failed', { error: e
, fPath
, version: v
});
181 _initStatements(_pgp
) {
182 const _scope
= _fileScope('_initStatements');
183 const _queryFile
= this._queryFileHelper(_pgp
);
184 this.statement
= _pgp
.utils
.enumSql(path
.join(__dirname
, 'sql'), {}, _queryFile
);
185 this.logger
.debug(_scope
, 'statements initialized', { statements: Object
.keys(this.statement
).length
});
189 async
healthCheck() {
190 const _scope
= _fileScope('healthCheck');
191 this.logger
.debug(_scope
, 'called', {});
192 const c
= await
this.db
.connect();
194 return { serverVersion: c
.client
.serverVersion
};
198 async
_currentSchema() {
199 return this.db
.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
203 async
_closeConnection() {
204 const _scope
= _fileScope('_closeConnection');
207 await
this.listener
.stop();
209 await
this._pgp
.end();
211 this.logger
.error(_scope
, 'failed', { error: e
});
217 /* istanbul ignore next */
218 async
_purgeTables(really
= false) {
219 const _scope
= _fileScope('_purgeTables');
222 await
this.db
.tx(async (t
) => {
225 // 'topic_fetch_in_progress',
227 // 'verification_in_progress',
229 // 'subscription_delivery_in_progress',
230 ].map(async (table
) => t
.query('TRUNCATE TABLE $(table:name) CASCADE', { table
})));
234 this.logger
.error(_scope
, 'failed', { error: e
});
240 // eslint-disable-next-line class-methods-use-this
241 _engineInfo(result
) {
243 changes: result
.rowCount
,
244 lastInsertRowid: result
.rows
.length
? result
.rows
[0].id : undefined,
245 duration: result
.duration
,
250 // eslint-disable-next-line class-methods-use-this
252 return common
.pick(result
, ['command', 'rowCount', 'duration']);
257 * Receive notices when topic entry is updated.
258 * Clear relevant cache entry.
259 * @param {String} payload
261 _topicChanged(payload
) {
262 const _scope
= _fileScope('_topicChanged');
263 if (payload
!== 'ping') {
264 this.logger
.debug(_scope
, 'called', { payload
});
265 this.cache
.delete(payload
);
271 * Called when a listener connection is opened.
274 _listenerEstablished() {
275 const _scope
= _fileScope('_listenerEstablished');
276 this.logger
.debug(_scope
, 'called', {});
277 this.cache
= new Map();
282 * Called when a listener connection is closed.
286 const _scope
= _fileScope('_listenerLost');
287 this.logger
.debug(_scope
, 'called', {});
293 * Return a cached entry, if available.
297 const _scope
= _fileScope('_cacheGet');
298 if (this.cache
&& this.cache
.has(key
)) {
299 const cacheEntry
= this.cache
.get(key
);
300 this.logger
.debug(_scope
, 'found cache entry', { key
, ...common
.pick(cacheEntry
, ['added', 'hits', 'lastHit']) });
301 cacheEntry
.hits
+= 1;
302 cacheEntry
.lastHit
= new Date();
303 return cacheEntry
.data
;
309 * Store an entry in cache, if available.
313 _cacheSet(key
, data
) {
314 const _scope
= _fileScope('_cacheSet');
316 this.cache
.set(key
, {
322 this.logger
.debug(_scope
, 'added cache entry', { key
});
328 return this.db
.task(async (t
) => fn(t
));
332 // eslint-disable-next-line class-methods-use-this
333 async
transaction(dbCtx
, fn
) {
334 return dbCtx
.txIf(async (t
) => fn(t
));
338 async
authenticationSuccess(dbCtx
, identifier
) {
339 const _scope
= _fileScope('authenticationSuccess');
340 this.logger
.debug(_scope
, 'called', { identifier
});
344 result
= await dbCtx
.result(this.statement
.authenticationSuccess
, { identifier
});
345 if (result
.rowCount
!= 1) {
346 throw new DBErrors
.UnexpectedResult('did not update authentication success event');
349 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
355 async
authenticationGet(dbCtx
, identifier
) {
356 const _scope
= _fileScope('authenticationGet');
357 this.logger
.debug(_scope
, 'called', { identifier
});
361 auth
= await dbCtx
.oneOrNone(this.statement
.authenticationGet
, { identifier
});
364 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
370 async
authenticationUpsert(dbCtx
, identifier
, credential
) {
371 const _scope
= _fileScope('authenticationUpsert');
372 const scrubbedCredential
= '*'.repeat((credential
|| '').length
);
373 this.logger
.debug(_scope
, 'called', { identifier
, scrubbedCredential
});
377 result
= await dbCtx
.result(this.statement
.authenticationUpsert
, { identifier
, credential
});
378 if (result
.rowCount
!= 1) {
379 throw new DBErrors
.UnexpectedResult('did not upsert authentication');
382 this.logger
.error(_scope
, 'failed', { error: e
, identifier
, scrubbedCredential
});
388 async
subscriptionsByTopicId(dbCtx
, topicId
) {
389 const _scope
= _fileScope('subscriptionsByTopicId');
390 this.logger
.debug(_scope
, 'called', { topicId
});
394 count
= await dbCtx
.manyOrNone(this.statement
.subscriptionsByTopicId
, { topicId
});
397 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
403 async
subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
404 const _scope
= _fileScope('subscriptionCountByTopicUrl');
405 this.logger
.debug(_scope
, 'called', { topicUrl
});
409 count
= await dbCtx
.one(this.statement
.subscriptionCountByTopicUrl
, { topicUrl
});
412 this.logger
.error(_scope
, 'failed', { error: e
, topicUrl
});
418 async
subscriptionDelete(dbCtx
, callback
, topicId
) {
419 const _scope
= _fileScope('subscriptionDelete');
420 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
423 const result
= await dbCtx
.result(this.statement
.subscriptionDelete
, { callback
, topicId
});
424 return this._engineInfo(result
);
426 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
432 async
subscriptionDeleteExpired(dbCtx
, topicId
) {
433 const _scope
= _fileScope('subscriptionDeleteExpired');
434 this.logger
.debug(_scope
, 'called', { topicId
});
437 const result
= await dbCtx
.result(this.statement
.subscriptionDeleteExpired
, { topicId
});
438 this.logger
.debug(_scope
, 'success', { topicId
, deleted: result
.rowCount
});
439 return this._engineInfo(result
);
441 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
447 async
subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
448 const _scope
= _fileScope('subscriptionDeliveryClaim');
449 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
, claimant
});
452 const claims
= await dbCtx
.txIf(async (txCtx
) => {
453 return txCtx
.manyOrNone(this.statement
.subscriptionDeliveryClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
455 return claims
.map((r
) => r
.id
);
457 this.logger
.error(_scope
, 'failed', { error: e
, claimant
, wanted
, claimTimeoutSeconds
});
463 async
subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
464 const _scope
= _fileScope('subscriptionDeliveryClaimById');
465 this.logger
.debug(_scope
, 'called', { subscriptionId
, claimTimeoutSeconds
, claimant
});
469 result
= await dbCtx
.txIf(async (txCtx
) => {
470 result
= await txCtx
.result(this.statement
.subscriptionDeliveryClaimById
, { claimant
, subscriptionId
, claimTimeoutSeconds
});
471 if (result
.rowCount
!= 1) {
472 throw new DBErrors
.UnexpectedResult('did not claim subscription delivery');
476 return this._engineInfo(result
);
478 this.logger
.error(_scope
, 'failed', { error: e
, claimant
, subscriptionId
, claimTimeoutSeconds
});
484 async
subscriptionDeliveryComplete(dbCtx
, callback
, topicId
, topicContentUpdated
) {
485 const _scope
= _fileScope('subscriptionDeliveryComplete');
486 this.logger
.debug(_scope
, 'called', { callback
, topicId
, topicContentUpdated
});
490 await dbCtx
.txIf(async (txCtx
) => {
491 result
= await txCtx
.result(this.statement
.subscriptionDeliverySuccess
, { callback
, topicId
, topicContentUpdated
});
492 if (result
.rowCount
!= 1) {
493 throw new DBErrors
.UnexpectedResult('did not set subscription delivery success');
495 result
= await txCtx
.result(this.statement
.subscriptionDeliveryDone
, { callback
, topicId
});
496 if (result
.rowCount
!= 1) {
497 throw new DBErrors
.UnexpectedResult('did not release subscription delivery');
501 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
, topicContentUpdated
});
507 async
subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
508 const _scope
= _fileScope('subscriptionDeliveryGone');
509 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
513 await dbCtx
.txIf(async (txCtx
) => {
514 result
= await txCtx
.result(this.statement
.subscriptionDelete
, { callback
, topicId
});
515 if (result
.rowCount
!= 1) {
516 throw new DBErrors
.UnexpectedResult('did not delete subscription');
518 // Delete cascades to delivery
519 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
520 // if (result.rowCount != 1) {
521 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
525 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
531 async
subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
= [60]) {
532 const _scope
= _fileScope('subscriptionDeliveryIncomplete');
533 this.logger
.debug(_scope
, 'called', { callback
, topicId
, retryDelays
});
537 await dbCtx
.txIf(async (txCtx
) => {
538 const { currentAttempt
} = await txCtx
.one(this.statement
.subscriptionDeliveryAttempts
, { callback
, topicId
});
539 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
540 result
= await txCtx
.result(this.statement
.subscriptionDeliveryFailure
, { nextAttemptDelaySeconds
, callback
, topicId
});
541 if (result
.rowCount
!= 1) {
542 throw new DBErrors
.UnexpectedResult('did not set subscription delivery failure');
544 result
= await txCtx
.result(this.statement
.subscriptionDeliveryDone
, { callback
, topicId
});
545 if (result
.rowCount
!= 1) {
546 throw new DBErrors
.UnexpectedResult('did not release subscription delivery');
550 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
556 async
subscriptionGet(dbCtx
, callback
, topicId
) {
557 const _scope
= _fileScope('subscriptionGet');
558 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
562 subscription
= await dbCtx
.oneOrNone(this.statement
.subscriptionGet
, { callback
, topicId
});
565 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
571 async
subscriptionGetById(dbCtx
, subscriptionId
) {
572 const _scope
= _fileScope('subscriptionGetById');
573 this.logger
.debug(_scope
, 'called', { subscriptionId
});
577 subscription
= await dbCtx
.oneOrNone(this.statement
.subscriptionGetById
, { subscriptionId
});
580 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionId
});
586 async
subscriptionUpdate(dbCtx
, data
) {
587 const _scope
= _fileScope('subscriptionUpdate');
588 this.logger
.debug(_scope
, 'called', { data
});
590 const subscriptionData
= {
594 this._subscriptionUpdateDataValidate(subscriptionData
);
598 result
= await dbCtx
.result(this.statement
.subscriptionUpdate
, subscriptionData
);
599 if (result
.rowCount
!= 1) {
600 throw new DBErrors
.UnexpectedResult('did not update subscription');
603 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
609 async
subscriptionUpsert(dbCtx
, data
) {
610 const _scope
= _fileScope('subscriptionUpsert');
611 this.logger
.debug(_scope
, 'called', { ...data
});
613 const subscriptionData
= {
615 httpRemoteAddr: null,
619 this._subscriptionUpsertDataValidate(subscriptionData
);
623 result
= await dbCtx
.result(this.statement
.subscriptionUpsert
, subscriptionData
);
624 if (result
.rowCount
!= 1) {
625 throw new DBErrors
.UnexpectedResult('did not upsert subscription');
627 return this._engineInfo(result
);
629 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
635 async
topicDeleted(dbCtx
, topicId
) {
636 const _scope
= _fileScope('topicDeleted');
637 this.logger
.debug(_scope
, 'called', { topicId
});
641 result
= await dbCtx
.result(this.statement
.topicDeleted
, { topicId
});
642 if (result
.rowCount
!= 1) {
643 throw new DBErrors
.UnexpectedResult('did not update topic as deleted');
646 this.logger
.error(_scope
, 'failed to update topic as deleted', { error: e
, topicId
});
652 async
topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
653 const _scope
= _fileScope('topicFetchClaim');
654 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
658 await dbCtx
.txIf(async (txCtx
) => {
659 claims
= await txCtx
.manyOrNone(this.statement
.topicContentFetchClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
661 return claims
.map((r
) => r
.id
);
663 this.logger
.error(_scope
, 'failed to claim topics for fetch', { error: e
});
669 async
topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
670 const _scope
= _fileScope('topicFetchClaimById');
671 this.logger
.debug(_scope
, 'called', { topicId
, claimTimeoutSeconds
, claimant
});
675 await dbCtx
.txIf(async (txCtx
) => {
676 result
= await txCtx
.result(this.statement
.topicContentFetchClaimById
, { topicId
, claimant
, claimTimeoutSeconds
});
678 return this._engineInfo(result
);
680 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
686 async
topicFetchComplete(dbCtx
, topicId
) {
687 const _scope
= _fileScope('topicFetchComplete');
688 this.logger
.debug(_scope
, 'called', { topicId
});
692 await dbCtx
.txIf(async (txCtx
) => {
693 result
= await txCtx
.result(this.statement
.topicAttemptsReset
, { topicId
});
694 if (result
.rowCount
!= 1) {
695 throw new DBErrors
.UnexpectedResult('did not reset topic attempts');
697 result
= await txCtx
.result(this.statement
.topicContentFetchDone
, { topicId
});
698 if (result
.rowCount
!= 1) {
699 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
702 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
703 return this._engineInfo(result
);
705 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
711 async
topicFetchIncomplete(dbCtx
, topicId
, retryDelays
= [60]) {
712 const _scope
= _fileScope('topicFetchIncomplete');
713 this.logger
.debug(_scope
, 'called', { topicId
});
717 result
= await dbCtx
.txIf(async (txCtx
) => {
718 const { contentFetchAttemptsSinceSuccess: currentAttempt
} = await txCtx
.one(this.statement
.topicAttempts
, { topicId
});
719 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
720 result
= await txCtx
.result(this.statement
.topicAttemptsIncrement
, { topicId
, nextAttemptDelaySeconds
});
721 if (result
.rowCount
!= 1) {
722 throw new DBErrors
.UnexpectedResult('did not set topic attempts');
724 result
= await txCtx
.result(this.statement
.topicContentFetchDone
, { topicId
});
725 if (result
.rowCount
!= 1) {
726 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
730 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
731 return this._engineInfo(result
);
733 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
739 async
topicFetchRequested(dbCtx
, topicId
) {
740 const _scope
= _fileScope('topicFetchRequested');
741 this.logger
.debug(_scope
, 'called', { topicId
});
745 result
= await dbCtx
.result(this.statement
.topicContentFetchRequested
, { topicId
});
746 if (result
.rowCount
!= 1) {
747 throw new DBErrors
.UnexpectedResult('did not set topic fetch requested');
749 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
750 return this._engineInfo(result
);
752 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
758 async
topicGetAll(dbCtx
) {
759 const _scope
= _fileScope('topicGetAll');
760 this.logger
.debug(_scope
, 'called');
764 topics
= await dbCtx
.manyOrNone(this.statement
.topicGetInfoAll
);
766 this.logger
.error(_scope
, 'failed', { error: e
, topics
});
770 topics
= topics
.map(this._topicDefaults
.bind(this));
776 async
topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
777 const _scope
= _fileScope('topicGetById');
778 this.logger
.debug(_scope
, 'called', { topicId
});
782 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetById
, { topicId
});
784 topic
= this._topicDefaults(topic
);
788 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
794 async
topicGetByUrl(dbCtx
, topicUrl
, applyDefaults
= true) {
795 const _scope
= _fileScope('topicGetByUrl');
796 this.logger
.debug(_scope
, 'called', { topicUrl
});
800 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetByUrl
, { topicUrl
});
802 topic
= this._topicDefaults(topic
);
806 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicUrl
});
812 async
topicGetContentById(dbCtx
, topicId
) {
813 const _scope
= _fileScope('topicGetContentById');
814 this.logger
.debug(_scope
, 'called', { topicId
});
818 topic
= this._cacheGet(topicId
);
822 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetContentById
, { topicId
});
823 const topicWithDefaults
= this._topicDefaults(topic
);
824 this._cacheSet(topicId
, topicWithDefaults
);
825 return topicWithDefaults
;
827 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
833 async
topicPendingDelete(dbCtx
, topicId
) {
834 const _scope
= _fileScope('topicPendingDelete');
835 this.logger
.debug(_scope
, 'called', { topicId
});
838 await dbCtx
.txIf(async (txCtx
) => {
839 const topic
= await txCtx
.one(this.statement
.topicGetById
, { topicId
});
840 if (!topic
.isDeleted
) {
841 this.logger
.debug(_scope
, 'topic not set deleted, not deleting', { topicId
});
845 const { count: subscriberCount
} = await txCtx
.one(this.statement
.subscriptionCountByTopicUrl
, { topicUrl: topic
.url
});
846 if (subscriberCount
) {
847 this.logger
.debug(_scope
, 'topic has subscribers, not deleting', { topicId
, subscriberCount
});
851 const result
= await txCtx
.result(this.statement
.topicDeleteById
, { topicId
});
852 if (result
.rowCount
!== 1) {
853 throw new DBErrors
.UnexpectedResult('did not delete topic');
856 this.logger
.debug(_scope
, 'success', { topicId
});
858 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
864 async
topicPublishHistory(dbCtx
, topicId
, days
) {
865 const _scope
= _fileScope('topicPublishHistory');
866 this.logger
.debug(_scope
, 'called', { topicId
, days
});
868 const events
= await dbCtx
.manyOrNone(this.statement
.topicPublishHistory
, { topicIds: [topicId
], daysAgo: days
});
869 const history
= Array
.from({ length: days
}, () => 0);
870 events
.forEach(({ daysAgo
, contentUpdates
}) => history
[daysAgo
] = Number(contentUpdates
));
876 async
topicSet(dbCtx
, data
) {
877 const _scope
= _fileScope('topicSet');
878 this.logger
.debug(_scope
, 'called', data
);
880 const topicSetData
= {
881 publisherValidationUrl: null,
882 leaseSecondsPreferred: null,
883 leaseSecondsMin: null,
884 leaseSecondsMax: null,
890 this._topicSetDataValidate(topicSetData
);
891 result
= await dbCtx
.result(this.statement
.topicUpsert
, topicSetData
);
892 if (result
.rowCount
!= 1) {
893 throw new DBErrors
.UnexpectedResult('did not set topic data');
895 this.logger
.debug(_scope
, 'success', { topicSetData
, ...this._resultLog(result
) });
896 return this._engineInfo(result
);
898 this.logger
.error(_scope
, 'failed', { error: e
, result
});
904 async
topicSetContent(dbCtx
, data
) {
905 const _scope
= _fileScope('topicSetContent');
906 const topicSetContentData
= {
909 httpLastModified: null,
913 ...topicSetContentData
,
914 content: common
.logTruncate(topicSetContentData
.content
, 100),
916 this.logger
.debug(_scope
, 'called', data
);
920 this._topicSetContentDataValidate(topicSetContentData
);
921 result
= await dbCtx
.result(this.statement
.topicSetContent
, topicSetContentData
);
922 logData
.result
= this._resultLog(result
);
923 if (result
.rowCount
!= 1) {
924 throw new DBErrors
.UnexpectedResult('did not set topic content');
926 result
= await dbCtx
.result(this.statement
.topicSetContentHistory
, {
927 topicId: data
.topicId
,
928 contentHash: data
.contentHash
,
929 contentSize: data
.content
.length
,
931 if (result
.rowCount
!= 1) {
932 throw new DBErrors
.UnexpectedResult('did not set topic content history');
934 this.logger
.debug(_scope
, 'success', { ...logData
});
935 return this._engineInfo(result
);
937 this.logger
.error(_scope
, 'failed', { error: e
, ...logData
});
943 async
topicUpdate(dbCtx
, data
) {
944 const _scope
= _fileScope('topicUpdate');
945 this.logger
.debug(_scope
, 'called', { data
});
948 leaseSecondsPreferred: null,
949 leaseSecondsMin: null,
950 leaseSecondsMax: null,
951 publisherValidationUrl: null,
955 this._topicUpdateDataValidate(topicData
);
959 result
= await dbCtx
.result(this.statement
.topicUpdate
, topicData
);
960 if (result
.rowCount
!= 1) {
961 throw new DBErrors
.UnexpectedResult('did not update topic');
964 this.logger
.error(_scope
, 'failed', { error: e
, topicData
});
970 async
verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
971 const _scope
= _fileScope('verificationClaim');
972 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
976 await dbCtx
.txIf(async (txCtx
) => {
977 result
= await txCtx
.manyOrNone(this.statement
.verificationClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
979 return result
.map((r
) => r
.id
);
981 this.logger
.error(_scope
, 'failed', { wanted
, claimTimeoutSeconds
});
988 async
verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
989 const _scope
= _fileScope('verificationClaimById');
990 this.logger
.debug(_scope
, 'called', { verificationId
, claimant
, claimTimeoutSeconds
});
994 await dbCtx
.txIf(async (txCtx
) => {
995 result
= await txCtx
.result(this.statement
.verificationClaimById
, { verificationId
, claimant
, claimTimeoutSeconds
});
997 return this._engineInfo(result
);
999 this.logger
.error(_scope
, 'failed', { verificationId
, claimant
, claimTimeoutSeconds
});
1005 async
verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
1006 const _scope
= _fileScope('verificationComplete');
1007 this.logger
.debug(_scope
, 'called', { verificationId
});
1011 await dbCtx
.txIf(async (txCtx
) => {
1012 result
= await txCtx
.result(this.statement
.verificationScrub
, { verificationId
, callback
, topicId
});
1013 if (result
.rowCount
< 1) {
1014 throw new DBErrors
.UnexpectedResult('did not remove verifications');
1018 this.logger
.error(_scope
, 'failed', { verificationId
});
1021 return this._engineInfo(result
);
1025 async
verificationGetById(dbCtx
, verificationId
) {
1026 const _scope
= _fileScope('verificationGetById');
1027 this.logger
.debug(_scope
, 'called', { verificationId
});
1031 verification
= await dbCtx
.oneOrNone(this.statement
.verificationGetById
, { verificationId
});
1032 return verification
;
1034 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1040 async
verificationIncomplete(dbCtx
, verificationId
, retryDelays
= [60]) {
1041 const _scope
= _fileScope('verificationIncomplete');
1042 this.logger
.debug(_scope
, 'called', { verificationId
});
1046 await dbCtx
.txIf(async (txCtx
) => {
1047 const { attempts
} = await txCtx
.one(this.statement
.verificationAttempts
, { verificationId
});
1048 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(attempts
, retryDelays
);
1049 result
= await txCtx
.result(this.statement
.verificationAttemptIncrement
, { verificationId
, nextAttemptDelaySeconds
});
1050 if (result
.rowCount
!= 1) {
1051 throw new DBErrors
.UnexpectedResult('did not update verification attempts');
1053 result
= await txCtx
.result(this.statement
.verificationDone
, { verificationId
});
1054 if (result
.rowCount
!= 1) {
1055 throw new DBErrors
.UnexpectedResult('did not release verification');
1059 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1065 async
verificationInsert(dbCtx
, verification
) {
1066 const _scope
= _fileScope('verificationInsert');
1067 this.logger
.debug(_scope
, 'called', { verification
});
1069 const verificationData
= {
1071 httpRemoteAddr: null,
1077 let result
, verificationId
;
1079 this._verificationDataValidate(verificationData
);
1080 result
= await dbCtx
.result(this.statement
.verificationInsert
, verificationData
);
1081 if (result
.rowCount
!= 1) {
1082 throw new DBErrors
.UnexpectedResult('did not insert verification');
1084 verificationId
= result
.rows
[0].id
;
1085 this.logger
.debug(_scope
, 'inserted verification', { verificationId
});
1087 return verificationId
;
1089 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
1095 async
verificationRelease(dbCtx
, verificationId
) {
1096 const _scope
= _fileScope('verificationRelease');
1097 this.logger
.debug(_scope
, 'called', { verificationId
});
1101 result
= await dbCtx
.result(this.statement
.verificationDone
, { verificationId
});
1102 if (result
.rowCount
!= 1) {
1103 throw new DBErrors
.UnexpectedResult('did not release verification');
1105 return this._engineInfo(result
);
1107 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1113 async
verificationUpdate(dbCtx
, verificationId
, data
) {
1114 const _scope
= _fileScope('verificationUpdate');
1115 this.logger
.debug(_scope
, 'called', { verificationId
, data
});
1117 const verificationData
= {
1125 this._verificationUpdateDataValidate(verificationData
);
1126 result
= await dbCtx
.result(this.statement
.verificationUpdate
, verificationData
);
1127 if (result
.rowCount
!= 1) {
1128 throw new DBErrors
.UnexpectedResult('did not update verification');
1131 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
1137 async
verificationValidated(dbCtx
, verificationId
) {
1138 const _scope
= _fileScope('verificationValidated');
1139 this.logger
.debug(_scope
, 'called', { verificationId
});
1143 result
= await dbCtx
.result(this.statement
.verificationValidate
, { verificationId
});
1144 if (result
.rowCount
!= 1) {
1145 throw new DBErrors
.UnexpectedResult('did not set verification validation');
1148 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1155 module
.exports
= DatabasePostgres
;