1 /* eslint-disable security/detect-object-injection */
4 const pgpInitOptions
= {
8 const path
= require('path');
9 const pgp
= require('pg-promise')(pgpInitOptions
);
10 const svh
= require('../schema-version-helper');
11 const Database
= require('../base');
12 const DBErrors
= require('../errors');
13 const Listener
= require('./listener');
14 const common
= require('../../common');
16 const _fileScope
= common
.fileScope(__filename
);
18 const PGTypeIdINT8
= 20; // Type Id 20 == INT8 (BIGINT)
19 const PGTYpeIdINT8Array
= 1016; //Type Id 1016 == INT8[] (BIGINT[])
20 pgp
.pg
.types
.setTypeParser(PGTypeIdINT8
, BigInt
); // Type Id 20 = INT8 (BIGINT)
21 const parseBigIntArray
= pgp
.pg
.types
.getTypeParser(PGTYpeIdINT8Array
); // Type Id 1016 = INT8[] (BIGINT[])
22 pgp
.pg
.types
.setTypeParser(PGTYpeIdINT8Array
, (a
) => parseBigIntArray(a
).map(BigInt
));
24 const schemaVersionsSupported
= {
37 class DatabasePostgres
extends Database
{
38 constructor(logger
, options
, _pgp
= pgp
) {
39 super(logger
, options
);
41 this.db
= _pgp(options
.db
.connectionString
);
42 this.schemaVersionsSupported
= schemaVersionsSupported
;
44 // Suppress QF warnings when running tests
45 this.noWarnings
= options
.db
.noWarnings
;
47 if (options
.db
.cacheEnabled
) {
48 this.listener
= new Listener(logger
, this.db
, {
49 ...options
.db
.listener
,
50 channel: 'topic_changed',
51 dataCallback: this._topicChanged
.bind(this),
52 connectionEstablishedCallback: this._listenerEstablished
.bind(this),
53 connectionLostCallback: this._listenerLost
.bind(this),
58 const queryLogLevel
= options
.db
.queryLogLevel
;
60 pgpInitOptions
.query
= (event
) => {
61 // Quell outgoing pings
62 if (event
?.query
?.startsWith('NOTIFY')) {
65 this.logger
[queryLogLevel
](_fileScope('pgp:query'), '', { ...common
.pick(event
|| {}, ['query', 'params']) });
70 pgpInitOptions
.error
= (err
, event
) => {
71 this.logger
.error(_fileScope('pgp:error'), '', { err
, event
});
73 // TODO: close connection on err.code === '57P03' database shutting down
76 // Deophidiate column names in-place, log results
77 pgpInitOptions
.receive
= ({ data
, result
, ctx: event
}) => {
78 const exemplaryRow
= data
[0];
79 for (const prop
in exemplaryRow
) {
80 const camel
= Database
._camelfy(prop
);
81 if (!(camel
in exemplaryRow
)) {
82 for (const d
of data
) {
89 // Quell outgoing pings
90 if (result
&& result
.command
=== 'NOTIFY') {
94 const resultLog
= common
.pick(result
|| {}, ['command', 'rowCount', 'duration']);
95 this.logger
[queryLogLevel
](_fileScope('pgp:result'), '', { query: event
.query
, ...resultLog
});
99 // Expose these for test coverage
100 this.pgpInitOptions
= pgpInitOptions
;
103 this._initStatements(_pgp
);
107 _queryFileHelper(_pgp
) {
109 const _scope
= _fileScope('_queryFile');
110 /* istanbul ignore next */
113 ...(this.noWarnings
&& { noWarnings: this.noWarnings
}),
115 const qf
= new _pgp
.QueryFile(file
, qfParams
);
117 this.logger
.error(_scope
, 'failed to create SQL statement', { error: qf
.error
, file
});
125 async
initialize(applyMigrations
= true) {
126 const _scope
= _fileScope('initialize');
127 this.logger
.debug(_scope
, 'called', { applyMigrations
});
128 if (applyMigrations
) {
129 await
this._initTables();
131 await
super.initialize();
133 await
this.listener
.start();
138 async
_initTables(_pgp
) {
139 const _scope
= _fileScope('_initTables');
140 this.logger
.debug(_scope
, 'called', {});
142 const _queryFile
= this._queryFileHelper(_pgp
|| this._pgp
);
144 // Migrations rely upon this table, ensure it exists.
145 const metaVersionTable
= '_meta_schema_version';
147 const tableExists
= async (name
) => this.db
.oneOrNone('SELECT table_name FROM information_schema.tables WHERE table_name=$(name)', { name
});
148 let metaExists
= await
tableExists(metaVersionTable
);
150 const fPath
= path
.join(__dirname
, 'sql', 'schema', 'init.sql');
151 const initSql
= _queryFile(fPath
);
152 const results
= await
this.db
.multiResult(initSql
);
153 this.logger
.debug(_scope
, 'executed init sql', { results
});
154 metaExists
= await
tableExists(metaVersionTable
);
155 /* istanbul ignore if */
157 throw new DBErrors
.UnexpectedResult(`did not create ${metaVersionTable} table`);
159 this.logger
.info(_scope
, 'created schema version table', { metaVersionTable
});
163 const currentSchema
= await
this._currentSchema();
164 const migrationsWanted
= svh
.unappliedSchemaVersions(__dirname
, currentSchema
, this.schemaVersionsSupported
);
165 this.logger
.debug(_scope
, 'schema migrations wanted', { migrationsWanted
});
166 for (const v
of migrationsWanted
) {
167 const fPath
= path
.join(__dirname
, 'sql', 'schema', v
, 'apply.sql');
169 const migrationSql
= _queryFile(fPath
);
170 this.logger
.debug(_scope
, 'applying migration', { version: v
});
171 const results
= await
this.db
.multiResult(migrationSql
);
172 this.logger
.debug(_scope
, 'migration results', { results
});
173 this.logger
.info(_scope
, 'applied migration', { version: v
});
175 this.logger
.error(_scope
, 'migration failed', { error: e
, fPath
, version: v
});
182 _initStatements(_pgp
) {
183 const _scope
= _fileScope('_initStatements');
184 const _queryFile
= this._queryFileHelper(_pgp
);
185 this.statement
= _pgp
.utils
.enumSql(path
.join(__dirname
, 'sql'), {}, _queryFile
);
186 this.logger
.debug(_scope
, 'statements initialized', { statements: Object
.keys(this.statement
).length
});
190 async
healthCheck() {
191 const _scope
= _fileScope('healthCheck');
192 this.logger
.debug(_scope
, 'called', {});
193 const c
= await
this.db
.connect();
195 return { serverVersion: c
.client
.serverVersion
};
199 async
_currentSchema() {
200 return this.db
.one('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1');
204 async
_closeConnection() {
205 const _scope
= _fileScope('_closeConnection');
208 await
this.listener
.stop();
210 await
this._pgp
.end();
212 this.logger
.error(_scope
, 'failed', { error: e
});
218 /* istanbul ignore next */
219 async
_purgeTables(really
= false) {
220 const _scope
= _fileScope('_purgeTables');
223 await
this.db
.tx(async (t
) => {
226 // 'topic_fetch_in_progress',
228 // 'verification_in_progress',
230 // 'subscription_delivery_in_progress',
231 ].map(async (table
) => t
.query('TRUNCATE TABLE $(table:name) CASCADE', { table
})));
235 this.logger
.error(_scope
, 'failed', { error: e
});
241 // eslint-disable-next-line class-methods-use-this
242 _engineInfo(result
) {
244 changes: result
.rowCount
,
245 lastInsertRowid: result
.rows
.length
? result
.rows
[0].id : undefined,
246 duration: result
.duration
,
251 // eslint-disable-next-line class-methods-use-this
253 return common
.pick(result
, ['command', 'rowCount', 'duration']);
258 * Receive notices when topic entry is updated.
259 * Clear relevant cache entry.
260 * @param {string} payload topic changed event
262 _topicChanged(payload
) {
263 const _scope
= _fileScope('_topicChanged');
264 if (payload
!== 'ping') {
265 this.logger
.debug(_scope
, 'called', { payload
});
266 this.cache
.delete(payload
);
272 * Called when a listener connection is opened.
275 _listenerEstablished() {
276 const _scope
= _fileScope('_listenerEstablished');
277 this.logger
.debug(_scope
, 'called', {});
278 this.cache
= new Map();
283 * Called when a listener connection is closed.
287 const _scope
= _fileScope('_listenerLost');
288 this.logger
.debug(_scope
, 'called', {});
294 * Return a cached entry, if available.
296 * @returns {object=} cached data
299 const _scope
= _fileScope('_cacheGet');
300 if (this.cache
?.has(key
)) {
301 const cacheEntry
= this.cache
.get(key
);
302 this.logger
.debug(_scope
, 'found cache entry', { key
, ...common
.pick(cacheEntry
, ['added', 'hits', 'lastHit']) });
303 cacheEntry
.hits
+= 1;
304 cacheEntry
.lastHit
= new Date();
305 return cacheEntry
.data
;
311 * Store an entry in cache, if available.
313 * @param {*} data data
315 _cacheSet(key
, data
) {
316 const _scope
= _fileScope('_cacheSet');
318 this.cache
.set(key
, {
324 this.logger
.debug(_scope
, 'added cache entry', { key
});
330 return this.db
.task(async (t
) => fn(t
));
334 // eslint-disable-next-line class-methods-use-this
335 async
transaction(dbCtx
, fn
) {
336 return dbCtx
.txIf(async (t
) => fn(t
));
340 async
authenticationSuccess(dbCtx
, identifier
) {
341 const _scope
= _fileScope('authenticationSuccess');
342 this.logger
.debug(_scope
, 'called', { identifier
});
346 result
= await dbCtx
.result(this.statement
.authenticationSuccess
, { identifier
});
347 if (result
.rowCount
!= 1) {
348 throw new DBErrors
.UnexpectedResult('did not update authentication success event');
351 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
357 async
authenticationGet(dbCtx
, identifier
) {
358 const _scope
= _fileScope('authenticationGet');
359 this.logger
.debug(_scope
, 'called', { identifier
});
363 auth
= await dbCtx
.oneOrNone(this.statement
.authenticationGet
, { identifier
});
366 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
372 async
authenticationUpsert(dbCtx
, identifier
, credential
, otpKey
) {
373 const _scope
= _fileScope('authenticationUpsert');
374 const scrubbedCredential
= '*'.repeat((credential
|| '').length
);
375 const scrubbedOTPKey
= '*'.repeat((otpKey
|| '').length
) || null;
376 this.logger
.debug(_scope
, 'called', { identifier
, scrubbedCredential
, scrubbedOTPKey
});
380 result
= await dbCtx
.result(this.statement
.authenticationUpsert
, { identifier
, credential
, otpKey
});
381 if (result
.rowCount
!= 1) {
382 throw new DBErrors
.UnexpectedResult('did not upsert authentication');
385 this.logger
.error(_scope
, 'failed', { error: e
, identifier
, scrubbedCredential
, scrubbedOTPKey
});
391 async
authenticationUpdateCredential(dbCtx
, identifier
, credential
) {
392 const _scope
= _fileScope('authenticationUpdateCredential');
393 const scrubbedCredential
= '*'.repeat((credential
|| '').length
);
394 this.logger
.debug(_scope
, 'called', { identifier
, scrubbedCredential
});
398 result
= await dbCtx
.result(this.statement
.authenticationUpdateCredential
, { identifier
, credential
});
399 if (result
.rowCount
!= 1) {
400 throw new DBErrors
.UnexpectedResult('did not update authentication credential');
403 this.logger
.error(_scope
, 'failed', { error: e
, identifier
, scrubbedCredential
});
409 async
authenticationUpdateOTPKey(dbCtx
, identifier
, otpKey
) {
410 const _scope
= _fileScope('authenticationUpdateOTPKey');
411 const scrubbedOTPKey
= '*'.repeat((otpKey
|| '').length
) || null;
412 this.logger
.debug(_scope
, 'called', { identifier
, scrubbedOTPKey
});
416 result
= await dbCtx
.result(this.statement
.authenticationUpdateOtpKey
, { identifier
, otpKey
});
417 if (result
.rowCount
!= 1) {
418 throw new DBErrors
.UnexpectedResult('did not update authentication otp key');
421 this.logger
.error(_scope
, 'failed', { error: e
, identifier
, scrubbedOTPKey
});
427 async
subscriptionsByTopicId(dbCtx
, topicId
) {
428 const _scope
= _fileScope('subscriptionsByTopicId');
429 this.logger
.debug(_scope
, 'called', { topicId
});
433 count
= await dbCtx
.manyOrNone(this.statement
.subscriptionsByTopicId
, { topicId
});
436 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
442 async
subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
443 const _scope
= _fileScope('subscriptionCountByTopicUrl');
444 this.logger
.debug(_scope
, 'called', { topicUrl
});
448 count
= await dbCtx
.one(this.statement
.subscriptionCountByTopicUrl
, { topicUrl
});
451 this.logger
.error(_scope
, 'failed', { error: e
, topicUrl
});
457 async
subscriptionDelete(dbCtx
, callback
, topicId
) {
458 const _scope
= _fileScope('subscriptionDelete');
459 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
462 const result
= await dbCtx
.result(this.statement
.subscriptionDelete
, { callback
, topicId
});
463 return this._engineInfo(result
);
465 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
471 async
subscriptionDeleteExpired(dbCtx
, topicId
) {
472 const _scope
= _fileScope('subscriptionDeleteExpired');
473 this.logger
.debug(_scope
, 'called', { topicId
});
476 const result
= await dbCtx
.result(this.statement
.subscriptionDeleteExpired
, { topicId
});
477 this.logger
.debug(_scope
, 'success', { topicId
, deleted: result
.rowCount
});
478 return this._engineInfo(result
);
480 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
486 async
subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
487 const _scope
= _fileScope('subscriptionDeliveryClaim');
488 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
, claimant
});
491 const claims
= await dbCtx
.txIf(async (txCtx
) => {
492 return txCtx
.manyOrNone(this.statement
.subscriptionDeliveryClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
494 return claims
.map((r
) => r
.id
);
496 this.logger
.error(_scope
, 'failed', { error: e
, claimant
, wanted
, claimTimeoutSeconds
});
502 async
subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
503 const _scope
= _fileScope('subscriptionDeliveryClaimById');
504 this.logger
.debug(_scope
, 'called', { subscriptionId
, claimTimeoutSeconds
, claimant
});
508 result
= await dbCtx
.txIf(async (txCtx
) => {
509 result
= await txCtx
.result(this.statement
.subscriptionDeliveryClaimById
, { claimant
, subscriptionId
, claimTimeoutSeconds
});
510 if (result
.rowCount
!= 1) {
511 throw new DBErrors
.UnexpectedResult('did not claim subscription delivery');
515 return this._engineInfo(result
);
517 this.logger
.error(_scope
, 'failed', { error: e
, claimant
, subscriptionId
, claimTimeoutSeconds
});
523 async
subscriptionDeliveryComplete(dbCtx
, callback
, topicId
, topicContentUpdated
) {
524 const _scope
= _fileScope('subscriptionDeliveryComplete');
525 this.logger
.debug(_scope
, 'called', { callback
, topicId
, topicContentUpdated
});
529 await dbCtx
.txIf(async (txCtx
) => {
530 result
= await txCtx
.result(this.statement
.subscriptionDeliverySuccess
, { callback
, topicId
, topicContentUpdated
});
531 if (result
.rowCount
!= 1) {
532 throw new DBErrors
.UnexpectedResult('did not set subscription delivery success');
534 result
= await txCtx
.result(this.statement
.subscriptionDeliveryDone
, { callback
, topicId
});
535 if (result
.rowCount
!= 1) {
536 throw new DBErrors
.UnexpectedResult('did not release subscription delivery');
540 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
, topicContentUpdated
});
546 async
subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
547 const _scope
= _fileScope('subscriptionDeliveryGone');
548 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
552 await dbCtx
.txIf(async (txCtx
) => {
553 result
= await txCtx
.result(this.statement
.subscriptionDelete
, { callback
, topicId
});
554 if (result
.rowCount
!= 1) {
555 throw new DBErrors
.UnexpectedResult('did not delete subscription');
557 // Delete cascades to delivery
558 // result = await txCtx.result(this.statement.subscriptionDeliveryDone, { callback, topicId });
559 // if (result.rowCount != 1) {
560 // throw new DBErrors.UnexpectedResult('did not release subscription delivery');
564 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
570 async
subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
= [60]) {
571 const _scope
= _fileScope('subscriptionDeliveryIncomplete');
572 this.logger
.debug(_scope
, 'called', { callback
, topicId
, retryDelays
});
576 await dbCtx
.txIf(async (txCtx
) => {
577 const { currentAttempt
} = await txCtx
.one(this.statement
.subscriptionDeliveryAttempts
, { callback
, topicId
});
578 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
579 result
= await txCtx
.result(this.statement
.subscriptionDeliveryFailure
, { nextAttemptDelaySeconds
, callback
, topicId
});
580 if (result
.rowCount
!= 1) {
581 throw new DBErrors
.UnexpectedResult('did not set subscription delivery failure');
583 result
= await txCtx
.result(this.statement
.subscriptionDeliveryDone
, { callback
, topicId
});
584 if (result
.rowCount
!= 1) {
585 throw new DBErrors
.UnexpectedResult('did not release subscription delivery');
589 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
595 async
subscriptionGet(dbCtx
, callback
, topicId
) {
596 const _scope
= _fileScope('subscriptionGet');
597 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
601 subscription
= await dbCtx
.oneOrNone(this.statement
.subscriptionGet
, { callback
, topicId
});
604 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
610 async
subscriptionGetById(dbCtx
, subscriptionId
) {
611 const _scope
= _fileScope('subscriptionGetById');
612 this.logger
.debug(_scope
, 'called', { subscriptionId
});
616 subscription
= await dbCtx
.oneOrNone(this.statement
.subscriptionGetById
, { subscriptionId
});
619 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionId
});
625 async
subscriptionUpdate(dbCtx
, data
) {
626 const _scope
= _fileScope('subscriptionUpdate');
627 this.logger
.debug(_scope
, 'called', { data
});
629 const subscriptionData
= {
633 this._subscriptionUpdateDataValidate(subscriptionData
);
637 result
= await dbCtx
.result(this.statement
.subscriptionUpdate
, subscriptionData
);
638 if (result
.rowCount
!= 1) {
639 throw new DBErrors
.UnexpectedResult('did not update subscription');
642 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
648 async
subscriptionUpsert(dbCtx
, data
) {
649 const _scope
= _fileScope('subscriptionUpsert');
650 this.logger
.debug(_scope
, 'called', { ...data
});
652 const subscriptionData
= {
654 httpRemoteAddr: null,
658 this._subscriptionUpsertDataValidate(subscriptionData
);
662 result
= await dbCtx
.result(this.statement
.subscriptionUpsert
, subscriptionData
);
663 if (result
.rowCount
!= 1) {
664 throw new DBErrors
.UnexpectedResult('did not upsert subscription');
666 return this._engineInfo(result
);
668 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
674 async
topicDeleted(dbCtx
, topicId
) {
675 const _scope
= _fileScope('topicDeleted');
676 this.logger
.debug(_scope
, 'called', { topicId
});
680 result
= await dbCtx
.result(this.statement
.topicDeleted
, { topicId
});
681 if (result
.rowCount
!= 1) {
682 throw new DBErrors
.UnexpectedResult('did not update topic as deleted');
685 this.logger
.error(_scope
, 'failed to update topic as deleted', { error: e
, topicId
});
691 async
topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
692 const _scope
= _fileScope('topicFetchClaim');
693 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
697 await dbCtx
.txIf(async (txCtx
) => {
698 claims
= await txCtx
.manyOrNone(this.statement
.topicContentFetchClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
700 return claims
.map((r
) => r
.id
);
702 this.logger
.error(_scope
, 'failed to claim topics for fetch', { error: e
});
708 async
topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
709 const _scope
= _fileScope('topicFetchClaimById');
710 this.logger
.debug(_scope
, 'called', { topicId
, claimTimeoutSeconds
, claimant
});
714 await dbCtx
.txIf(async (txCtx
) => {
715 result
= await txCtx
.result(this.statement
.topicContentFetchClaimById
, { topicId
, claimant
, claimTimeoutSeconds
});
717 return this._engineInfo(result
);
719 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
725 async
topicFetchComplete(dbCtx
, topicId
) {
726 const _scope
= _fileScope('topicFetchComplete');
727 this.logger
.debug(_scope
, 'called', { topicId
});
731 await dbCtx
.txIf(async (txCtx
) => {
732 result
= await txCtx
.result(this.statement
.topicAttemptsReset
, { topicId
});
733 if (result
.rowCount
!= 1) {
734 throw new DBErrors
.UnexpectedResult('did not reset topic attempts');
736 result
= await txCtx
.result(this.statement
.topicContentFetchDone
, { topicId
});
737 if (result
.rowCount
!= 1) {
738 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
741 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
742 return this._engineInfo(result
);
744 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
750 async
topicFetchIncomplete(dbCtx
, topicId
, retryDelays
= [60]) {
751 const _scope
= _fileScope('topicFetchIncomplete');
752 this.logger
.debug(_scope
, 'called', { topicId
});
756 result
= await dbCtx
.txIf(async (txCtx
) => {
757 const { contentFetchAttemptsSinceSuccess: currentAttempt
} = await txCtx
.one(this.statement
.topicAttempts
, { topicId
});
758 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
759 result
= await txCtx
.result(this.statement
.topicAttemptsIncrement
, { topicId
, nextAttemptDelaySeconds
});
760 if (result
.rowCount
!= 1) {
761 throw new DBErrors
.UnexpectedResult('did not set topic attempts');
763 result
= await txCtx
.result(this.statement
.topicContentFetchDone
, { topicId
});
764 if (result
.rowCount
!= 1) {
765 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
769 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
770 return this._engineInfo(result
);
772 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
778 async
topicFetchRequested(dbCtx
, topicId
) {
779 const _scope
= _fileScope('topicFetchRequested');
780 this.logger
.debug(_scope
, 'called', { topicId
});
784 result
= await dbCtx
.result(this.statement
.topicContentFetchRequested
, { topicId
});
785 if (result
.rowCount
!= 1) {
786 throw new DBErrors
.UnexpectedResult('did not set topic fetch requested');
788 this.logger
.debug(_scope
, 'success', { topicId
, ...this._resultLog(result
) });
789 return this._engineInfo(result
);
791 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
797 async
topicGetAll(dbCtx
) {
798 const _scope
= _fileScope('topicGetAll');
799 this.logger
.debug(_scope
, 'called');
803 topics
= await dbCtx
.manyOrNone(this.statement
.topicGetInfoAll
);
805 this.logger
.error(_scope
, 'failed', { error: e
, topics
});
809 topics
= topics
.map(this._topicDefaults
.bind(this));
815 async
topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
816 const _scope
= _fileScope('topicGetById');
817 this.logger
.debug(_scope
, 'called', { topicId
});
821 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetById
, { topicId
});
823 topic
= this._topicDefaults(topic
);
827 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
833 async
topicGetByUrl(dbCtx
, topicUrl
, applyDefaults
= true) {
834 const _scope
= _fileScope('topicGetByUrl');
835 this.logger
.debug(_scope
, 'called', { topicUrl
});
839 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetByUrl
, { topicUrl
});
841 topic
= this._topicDefaults(topic
);
845 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicUrl
});
851 async
topicGetContentById(dbCtx
, topicId
) {
852 const _scope
= _fileScope('topicGetContentById');
853 this.logger
.debug(_scope
, 'called', { topicId
});
857 topic
= this._cacheGet(topicId
);
861 topic
= await dbCtx
.oneOrNone(this.statement
.topicGetContentById
, { topicId
});
862 const topicWithDefaults
= this._topicDefaults(topic
);
863 this._cacheSet(topicId
, topicWithDefaults
);
864 return topicWithDefaults
;
866 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
872 async
topicPendingDelete(dbCtx
, topicId
) {
873 const _scope
= _fileScope('topicPendingDelete');
874 this.logger
.debug(_scope
, 'called', { topicId
});
877 await dbCtx
.txIf(async (txCtx
) => {
878 const topic
= await txCtx
.one(this.statement
.topicGetById
, { topicId
});
879 if (!topic
.isDeleted
) {
880 this.logger
.debug(_scope
, 'topic not set deleted, not deleting', { topicId
});
884 const { count: subscriberCount
} = await txCtx
.one(this.statement
.subscriptionCountByTopicUrl
, { topicUrl: topic
.url
});
885 if (subscriberCount
) {
886 this.logger
.debug(_scope
, 'topic has subscribers, not deleting', { topicId
, subscriberCount
});
890 const result
= await txCtx
.result(this.statement
.topicDeleteById
, { topicId
});
891 if (result
.rowCount
!== 1) {
892 throw new DBErrors
.UnexpectedResult('did not delete topic');
895 this.logger
.debug(_scope
, 'success', { topicId
});
897 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
903 async
topicPublishHistory(dbCtx
, topicId
, days
) {
904 const _scope
= _fileScope('topicPublishHistory');
905 this.logger
.debug(_scope
, 'called', { topicId
, days
});
907 const events
= await dbCtx
.manyOrNone(this.statement
.topicPublishHistory
, { topicIds: [topicId
], daysAgo: days
});
908 const history
= Array
.from({ length: days
}, () => 0);
909 events
.forEach(({ daysAgo
, contentUpdates
}) => history
[daysAgo
] = Number(contentUpdates
));
915 async
topicSet(dbCtx
, data
) {
916 const _scope
= _fileScope('topicSet');
917 this.logger
.debug(_scope
, 'called', data
);
919 const topicSetData
= {
920 publisherValidationUrl: null,
921 leaseSecondsPreferred: null,
922 leaseSecondsMin: null,
923 leaseSecondsMax: null,
929 this._topicSetDataValidate(topicSetData
);
930 result
= await dbCtx
.result(this.statement
.topicUpsert
, topicSetData
);
931 if (result
.rowCount
!= 1) {
932 throw new DBErrors
.UnexpectedResult('did not set topic data');
934 this.logger
.debug(_scope
, 'success', { topicSetData
, ...this._resultLog(result
) });
935 return this._engineInfo(result
);
937 this.logger
.error(_scope
, 'failed', { error: e
, result
});
943 async
topicSetContent(dbCtx
, data
) {
944 const _scope
= _fileScope('topicSetContent');
945 const topicSetContentData
= {
948 httpLastModified: null,
952 ...topicSetContentData
,
953 content: common
.logTruncate(topicSetContentData
.content
, 100),
955 this.logger
.debug(_scope
, 'called', data
);
959 this._topicSetContentDataValidate(topicSetContentData
);
960 result
= await dbCtx
.result(this.statement
.topicSetContent
, topicSetContentData
);
961 logData
.result
= this._resultLog(result
);
962 if (result
.rowCount
!= 1) {
963 throw new DBErrors
.UnexpectedResult('did not set topic content');
965 result
= await dbCtx
.result(this.statement
.topicSetContentHistory
, {
966 topicId: data
.topicId
,
967 contentHash: data
.contentHash
,
968 contentSize: data
.content
.length
,
970 if (result
.rowCount
!= 1) {
971 throw new DBErrors
.UnexpectedResult('did not set topic content history');
973 this.logger
.debug(_scope
, 'success', { ...logData
});
974 return this._engineInfo(result
);
976 this.logger
.error(_scope
, 'failed', { error: e
, ...logData
});
982 async
topicUpdate(dbCtx
, data
) {
983 const _scope
= _fileScope('topicUpdate');
984 this.logger
.debug(_scope
, 'called', { data
});
987 leaseSecondsPreferred: null,
988 leaseSecondsMin: null,
989 leaseSecondsMax: null,
990 publisherValidationUrl: null,
994 this._topicUpdateDataValidate(topicData
);
998 result
= await dbCtx
.result(this.statement
.topicUpdate
, topicData
);
999 if (result
.rowCount
!= 1) {
1000 throw new DBErrors
.UnexpectedResult('did not update topic');
1003 this.logger
.error(_scope
, 'failed', { error: e
, topicData
});
1009 async
verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
1010 const _scope
= _fileScope('verificationClaim');
1011 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
1015 await dbCtx
.txIf(async (txCtx
) => {
1016 result
= await txCtx
.manyOrNone(this.statement
.verificationClaim
, { claimant
, wanted
, claimTimeoutSeconds
});
1018 return result
.map((r
) => r
.id
);
1020 this.logger
.error(_scope
, 'failed', { wanted
, claimTimeoutSeconds
});
1027 async
verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
1028 const _scope
= _fileScope('verificationClaimById');
1029 this.logger
.debug(_scope
, 'called', { verificationId
, claimant
, claimTimeoutSeconds
});
1033 await dbCtx
.txIf(async (txCtx
) => {
1034 result
= await txCtx
.result(this.statement
.verificationClaimById
, { verificationId
, claimant
, claimTimeoutSeconds
});
1036 return this._engineInfo(result
);
1038 this.logger
.error(_scope
, 'failed', { verificationId
, claimant
, claimTimeoutSeconds
});
1044 async
verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
1045 const _scope
= _fileScope('verificationComplete');
1046 this.logger
.debug(_scope
, 'called', { verificationId
});
1050 await dbCtx
.txIf(async (txCtx
) => {
1051 result
= await txCtx
.result(this.statement
.verificationScrub
, { verificationId
, callback
, topicId
});
1052 if (result
.rowCount
< 1) {
1053 throw new DBErrors
.UnexpectedResult('did not remove verifications');
1057 this.logger
.error(_scope
, 'failed', { verificationId
});
1060 return this._engineInfo(result
);
1064 async
verificationGetById(dbCtx
, verificationId
) {
1065 const _scope
= _fileScope('verificationGetById');
1066 this.logger
.debug(_scope
, 'called', { verificationId
});
1070 verification
= await dbCtx
.oneOrNone(this.statement
.verificationGetById
, { verificationId
});
1071 return verification
;
1073 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1079 async
verificationIncomplete(dbCtx
, verificationId
, retryDelays
= [60]) {
1080 const _scope
= _fileScope('verificationIncomplete');
1081 this.logger
.debug(_scope
, 'called', { verificationId
});
1085 await dbCtx
.txIf(async (txCtx
) => {
1086 const { attempts
} = await txCtx
.one(this.statement
.verificationAttempts
, { verificationId
});
1087 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(attempts
, retryDelays
);
1088 result
= await txCtx
.result(this.statement
.verificationAttemptIncrement
, { verificationId
, nextAttemptDelaySeconds
});
1089 if (result
.rowCount
!= 1) {
1090 throw new DBErrors
.UnexpectedResult('did not update verification attempts');
1092 result
= await txCtx
.result(this.statement
.verificationDone
, { verificationId
});
1093 if (result
.rowCount
!= 1) {
1094 throw new DBErrors
.UnexpectedResult('did not release verification');
1098 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1104 async
verificationInsert(dbCtx
, verification
) {
1105 const _scope
= _fileScope('verificationInsert');
1106 this.logger
.debug(_scope
, 'called', { verification
});
1108 const verificationData
= {
1110 httpRemoteAddr: null,
1116 let result
, verificationId
;
1118 this._verificationDataValidate(verificationData
);
1119 result
= await dbCtx
.result(this.statement
.verificationInsert
, verificationData
);
1120 if (result
.rowCount
!= 1) {
1121 throw new DBErrors
.UnexpectedResult('did not insert verification');
1123 verificationId
= result
.rows
[0].id
;
1124 this.logger
.debug(_scope
, 'inserted verification', { verificationId
});
1126 return verificationId
;
1128 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
1134 async
verificationRelease(dbCtx
, verificationId
) {
1135 const _scope
= _fileScope('verificationRelease');
1136 this.logger
.debug(_scope
, 'called', { verificationId
});
1140 result
= await dbCtx
.result(this.statement
.verificationDone
, { verificationId
});
1141 if (result
.rowCount
!= 1) {
1142 throw new DBErrors
.UnexpectedResult('did not release verification');
1144 return this._engineInfo(result
);
1146 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1152 async
verificationUpdate(dbCtx
, verificationId
, data
) {
1153 const _scope
= _fileScope('verificationUpdate');
1154 this.logger
.debug(_scope
, 'called', { verificationId
, data
});
1156 const verificationData
= {
1164 this._verificationUpdateDataValidate(verificationData
);
1165 result
= await dbCtx
.result(this.statement
.verificationUpdate
, verificationData
);
1166 if (result
.rowCount
!= 1) {
1167 throw new DBErrors
.UnexpectedResult('did not update verification');
1170 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
1176 async
verificationValidated(dbCtx
, verificationId
) {
1177 const _scope
= _fileScope('verificationValidated');
1178 this.logger
.debug(_scope
, 'called', { verificationId
});
1182 result
= await dbCtx
.result(this.statement
.verificationValidate
, { verificationId
});
1183 if (result
.rowCount
!= 1) {
1184 throw new DBErrors
.UnexpectedResult('did not set verification validation');
1187 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1194 module
.exports
= DatabasePostgres
;