3 const common
= require('../../common');
4 const Database
= require('../base');
5 const DBErrors
= require('../errors');
6 const svh
= require('../schema-version-helper');
7 const SQLite
= require('better-sqlite3');
8 const fs
= require('fs');
9 const path
= require('path');
10 const { performance
} = require('perf_hooks');
12 const _fileScope
= common
.fileScope(__filename
);
14 const schemaVersionsSupported
= {
27 // max of signed int64 (2^63 - 1), should be enough
28 const EPOCH_FOREVER
= BigInt('9223372036854775807');
29 const epochToDate
= (epoch
) => new Date(Number(epoch
) * 1000);
30 const dateToEpoch
= (date
) => Math
.round(date
.getTime() / 1000);
32 class DatabaseSQLite
extends Database
{
33 constructor(logger
, options
) {
34 super(logger
, options
);
36 const connectionString
= options
.db
.connectionString
|| 'sqlite://:memory:';
37 const csDelim
= '://';
38 const dbFilename
= connectionString
.slice(connectionString
.indexOf(csDelim
) + csDelim
.length
);
40 const queryLogLevel
= options
.db
.queryLogLevel
;
42 const sqliteOptions
= {
43 ...(queryLogLevel
&& {
44 // eslint-disable-next-line security/detect-object-injection
45 verbose: (query
) => this.logger
[queryLogLevel
](_fileScope('SQLite:verbose'), '', { query
}),
48 this.db
= new SQLite(dbFilename
, sqliteOptions
);
49 this.schemaVersionsSupported
= schemaVersionsSupported
;
50 this.changesSinceLastOptimize
= BigInt(0);
51 this.optimizeAfterChanges
= options
.db
.connectionString
.optimizeAfterChanges
;
52 this.db
.pragma('foreign_keys = on'); // Enforce consistency.
53 this.db
.pragma('journal_mode = WAL'); // Be faster, expect local filesystem.
54 this.db
.defaultSafeIntegers(true); // This probably isn't necessary, but by using these BigInts we keep weird floats out of the query logs.
57 this._initStatements();
62 * SQLite cannot prepare its statements without a schema, ensure such exists.
65 const _scope
= _fileScope('_initTables');
67 // Migrations rely upon this table, ensure it exists.
68 const metaVersionTable
= '_meta_schema_version';
69 const tableExists
= this.db
.prepare('SELECT name FROM sqlite_master WHERE type=:type AND name=:name').pluck(true).bind({ type: 'table', name: metaVersionTable
});
70 let metaExists
= tableExists
.get();
71 if (metaExists
=== undefined) {
72 const fPath
= path
.join(__dirname
, 'sql', 'schema', 'init.sql');
73 // eslint-disable-next-line security/detect-non-literal-fs-filename
74 const fSql
= fs
.readFileSync(fPath
, { encoding: 'utf8' });
76 metaExists
= tableExists
.get();
77 /* istanbul ignore if */
78 if (metaExists
=== undefined) {
79 throw new DBErrors
.UnexpectedResult(`did not create ${metaVersionTable} table`);
81 this.logger
.info(_scope
, 'created schema version table', { metaVersionTable
});
85 const currentSchema
= this._currentSchema();
86 const migrationsWanted
= svh
.unappliedSchemaVersions(__dirname
, currentSchema
, this.schemaVersionsSupported
);
87 this.logger
.debug(_scope
, 'schema migrations wanted', { migrationsWanted
});
88 migrationsWanted
.forEach((v
) => {
89 const fPath
= path
.join(__dirname
, 'sql', 'schema', v
, 'apply.sql');
91 // eslint-disable-next-line security/detect-non-literal-fs-filename
92 const fSql
= fs
.readFileSync(fPath
, { encoding: 'utf8' });
93 this.logger
.debug(_scope
, 'applying migration', { version: v
});
94 const results
= this.db
.exec(fSql
);
95 this.logger
.debug(_scope
, 'migration results', { results
});
96 this.logger
.info(_scope
, 'applied migration', { version: v
});
98 this.logger
.error(_scope
, 'migration failed', { error: e
, fPath
, version: v
});
106 const _scope
= _fileScope('_initStatements');
107 const sqlDir
= path
.join(__dirname
, 'sql');
110 // Decorate the statement calls we use with timing and logging.
111 const wrapFetch
= (logName
, statementName
, fn
) => {
112 const _wrapScope
= _fileScope(logName
);
113 return (...args
) => {
114 const startTimestampMs
= performance
.now();
115 const rows
= fn(...args
);
116 DatabaseSQLite
._deOphidiate(rows
);
117 const elapsedTimeMs
= performance
.now() - startTimestampMs
;
118 this.logger
.debug(_wrapScope
, 'complete', { statementName
, elapsedTimeMs
});
122 const wrapRun
= (logName
, statementName
, fn
) => {
123 const _wrapScope
= _fileScope(logName
);
124 return (...args
) => {
125 const startTimestampMs
= performance
.now();
126 const result
= fn(...args
);
127 const elapsedTimeMs
= performance
.now() - startTimestampMs
;
128 this.logger
.debug(_wrapScope
, 'complete', { ...result
, statementName
, elapsedTimeMs
});
129 result
.duration
= elapsedTimeMs
;
134 // eslint-disable-next-line security/detect-non-literal-fs-filename
135 for (const f
of fs
.readdirSync(sqlDir
)) {
136 const fPath
= path
.join(sqlDir
, f
);
137 const { name: fName
, ext: fExt
} = path
.parse(f
);
138 // eslint-disable-next-line security/detect-non-literal-fs-filename
139 const stat
= fs
.statSync(fPath
);
141 || fExt
.toLowerCase() !== '.sql') {
144 // eslint-disable-next-line security/detect-non-literal-fs-filename
145 const fSql
= fs
.readFileSync(fPath
, { encoding: 'utf8' });
146 const statementName
= Database
._camelfy(fName
.toLowerCase(), '-');
149 statement
= this.db
.prepare(fSql
);
151 /* istanbul ignore next */
152 this.logger
.error(_scope
, 'failed to prepare statement', { error: e
, file: f
});
153 /* istanbul ignore next */
156 // eslint-disable-next-line security/detect-object-injection
157 this.statement
[statementName
] = statement
;
158 const { get: origGet
, all: origAll
, run: origRun
} = statement
;
159 statement
.get = wrapFetch('SQLite:get', statementName
, origGet
.bind(statement
));
160 statement
.all
= wrapFetch('SQLite:all', statementName
, origAll
.bind(statement
));
161 statement
.run
= wrapRun('SQLite:run', statementName
, origRun
.bind(statement
));
163 this.statement
._optimize
= this.db
.prepare('SELECT * FROM pragma_optimize(0x03)');
165 this.logger
.debug(_scope
, 'statements initialized', { statements: Object
.keys(this.statement
).length
});
169 static _deOphidiate(rows
) {
170 const rowsIsArray
= Array
.isArray(rows
);
174 const exemplaryRow
= rows
[0];
175 for (const prop
in exemplaryRow
) {
176 const camel
= Database
._camelfy(prop
);
177 if (!(camel
in exemplaryRow
)) {
178 for (const d
of rows
) {
179 // eslint-disable-next-line security/detect-object-injection
181 // eslint-disable-next-line security/detect-object-injection
186 return rowsIsArray
? rows : rows
[0];
191 return this.db
.prepare('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1').get();
196 const _scope
= _fileScope('healthCheck');
197 this.logger
.debug(_scope
, 'called', {});
199 throw new DBErrors
.UnexpectedResult('database is not open');
201 return { open: this.db
.open
};
205 _engineInfo(result
) {
206 if (result
.changes
) {
207 this.changesSinceLastOptimize
+= BigInt(result
.changes
);
211 changes: Number(result
.changes
),
212 lastInsertRowid: result
.lastInsertRowid
,
223 const _scope
= _fileScope('_optimize');
225 if (this.optimizeAfterChanges
226 && this.changesSinceLastOptimize
>= this.optimizeAfterChanges
) {
227 const optimize
= this.statement
._optimize
.all();
228 this.logger
.debug(_scope
, 'optimize', { optimize
});
229 this.db
.pragma('optimize');
230 this.changesSinceLastOptimize
= BigInt(0);
235 _purgeTables(really
) {
239 'topic_fetch_in_progress',
241 'verification_in_progress',
243 'subscription_delivery_in_progress',
245 const result
= this.db
.prepare(`DELETE FROM ${table}`).run();
246 this.logger
.debug(_fileScope('_purgeTables'), 'success', { table
, result
});
257 transaction(dbCtx
, fn
) {
258 dbCtx
= dbCtx
|| this.db
;
259 return dbCtx
.transaction(fn
)();
263 authenticationSuccess(dbCtx
, identifier
) {
264 const _scope
= _fileScope('authenticationSuccess');
265 this.logger
.debug(_scope
, 'called', { identifier
});
269 result
= this.statement
.authenticationSuccess
.run({ identifier
});
270 if (result
.changes
!= 1) {
271 throw new DBErrors
.UnexpectedResult('did not update authentication success');
274 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
280 authenticationGet(dbCtx
, identifier
) {
281 const _scope
= _fileScope('authenticationGet');
282 this.logger
.debug(_scope
, 'called', { identifier
});
285 return this.statement
.authenticationGet
.get({ identifier
});
287 this.logger
.error(_scope
, 'failed', { error: e
, identifier
});
293 authenticationUpsert(dbCtx
, identifier
, credential
) {
294 const _scope
= _fileScope('authenticationUpsert');
295 const scrubbedCredential
= '*'.repeat((credential
|| '').length
);
296 this.logger
.debug(_scope
, 'called', { identifier
, scrubbedCredential
});
300 result
= this.statement
.authenticationUpsert
.run({ identifier
, credential
});
301 if (result
.changes
!= 1) {
302 throw new DBErrors
.UnexpectedResult('did not upsert authentication');
305 this.logger
.error(_scope
, 'failed', { error: e
, identifier
, scrubbedCredential
})
312 * Converts engine subscription fields to native types.
313 * @param {Object} data
315 static _subscriptionDataToNative(data
) {
317 ['created', 'verified', 'expires', 'contentDelivered'].forEach((field
) => {
318 // eslint-disable-next-line security/detect-object-injection
319 data
[field
] = epochToDate(data
[field
]);
326 subscriptionsByTopicId(dbCtx
, topicId
) {
327 const _scope
= _fileScope('subscriptionsByTopicId');
328 this.logger
.debug(_scope
, 'called', { topicId
});
331 const subscriptions
= this.statement
.subscriptionsByTopicId
.all({ topicId
});
332 return subscriptions
.map((s
) => DatabaseSQLite
._subscriptionDataToNative(s
));
334 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
340 subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
341 const _scope
= _fileScope('subscriptionCountByTopicUrl');
342 this.logger
.debug(_scope
, 'called', { topicUrl
});
345 return this.statement
.subscriptionCountByTopicUrl
.get({ topicUrl
});
347 this.logger
.error(_scope
, 'failed', { error: e
, topicUrl
});
353 subscriptionDelete(dbCtx
, callback
, topicId
) {
354 const _scope
= _fileScope('subscriptionDelete');
355 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
358 const result
= this.statement
.subscriptionDelete
.run({ callback
, topicId
});
359 if (result
.changes
!= 1) {
360 throw new DBErrors
.UnexpectedResult('did not delete subscription');
362 return this._engineInfo(result
);
364 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
370 subscriptionDeleteExpired(dbCtx
, topicId
) {
371 const _scope
= _fileScope('subscriptionDeleteExpired');
372 this.logger
.debug(_scope
, 'called', { topicId
});
375 const result
= this.statement
.subscriptionDeleteExpired
.run({ topicId
});
376 this.logger
.debug(_scope
, 'success', { topicId
, deleted: result
.changes
});
377 return this._engineInfo(result
);
379 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
385 subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
386 const _scope
= _fileScope('subscriptionDeliveryClaim');
387 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
, claimant
});
391 this.db
.transaction(() => {
392 subscriptionIds
= this.statement
.subscriptionDeliveryNeeded
.all({ wanted
}).map((claim
) => claim
.id
);
393 subscriptionIds
.forEach((subscriptionId
) => {
394 const result
= this.statement
.subscriptionDeliveryClaimById
.run({ subscriptionId
, claimTimeoutSeconds
, claimant
});
395 if (result
.changes
!= 1) {
396 throw new DBErrors
.UnexpectedResult('did not claim subscription delivery');
400 return subscriptionIds
;
402 this.logger
.error(_scope
, 'failed', { error: e
, wanted
, claimTimeoutSeconds
, claimant
, subscriptionIds
});
408 subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
409 const _scope
= _fileScope('subscriptionDeliveryClaimById');
410 this.logger
.debug(_scope
, 'called', { subscriptionId
, claimTimeoutSeconds
, claimant
});
413 const result
= this.statement
.subscriptionDeliveryClaimById
.run({ subscriptionId
, claimTimeoutSeconds
, claimant
});
414 if (result
.changes
!= 1) {
415 throw new DBErrors
.UnexpectedResult('did not claim subscription delivery');
417 return this._engineInfo(result
);
419 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionId
, claimTimeoutSeconds
, claimant
});
425 subscriptionDeliveryComplete(dbCtx
, callback
, topicId
, topicContentUpdated
) {
426 const _scope
= _fileScope('subscriptionDeliveryComplete');
427 this.logger
.debug(_scope
, 'called', { callback
, topicId
, topicContentUpdated
});
431 this.db
.transaction(() => {
432 topicContentUpdated
= dateToEpoch(topicContentUpdated
);
433 result
= this.statement
.subscriptionDeliverySuccess
.run({ callback
, topicId
, topicContentUpdated
});
434 if (result
.changes
!= 1) {
435 throw new DBErrors
.UnexpectedResult('did not set subscription delivery success');
437 result
= this.statement
.subscriptionDeliveryDone
.run({ callback
, topicId
});
438 if (result
.changes
!= 1) {
439 throw new DBErrors
.UnexpectedResult('did not complete subscription delivery');
442 return this._engineInfo(result
);
444 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
, topicContentUpdated
});
450 subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
451 const _scope
= _fileScope('subscriptionDeliveryGone');
452 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
456 this.db
.transaction(() => {
457 result
= this.statement
.subscriptionDelete
.run({ callback
, topicId
});
458 if (result
.changes
!= 1) {
459 throw new DBErrors
.UnexpectedResult('did not delete subscription');
461 // Delete cascades to delivery
462 // result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
463 // if (result.changes != 1) {
464 // throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
467 return this._engineInfo(result
);
469 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
475 subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
= [60]) {
476 const _scope
= _fileScope('subscriptionDeliveryIncomplete');
477 this.logger
.debug(_scope
, 'called', { callback
, topicId
, retryDelays
});
481 this.db
.transaction(() => {
482 const { currentAttempt
} = this.statement
.subscriptionDeliveryAttempts
.get({ callback
, topicId
});
483 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
484 result
= this.statement
.subscriptionDeliveryFailure
.run({ nextAttemptDelaySeconds
, callback
, topicId
});
485 if (result
.changes
!= 1) {
486 throw new DBErrors
.UnexpectedResult('did not set delivery failure');
488 result
= this.statement
.subscriptionDeliveryDone
.run({ callback
, topicId
});
489 if (result
.changes
!= 1) {
490 throw new DBErrors
.UnexpectedResult('did not complete subscription delivery');
494 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
500 subscriptionGet(dbCtx
, callback
, topicId
) {
501 const _scope
= _fileScope('subscriptionGet');
502 this.logger
.debug(_scope
, 'called', { callback
, topicId
});
506 subscription
= this.statement
.subscriptionGet
.get({ callback
, topicId
});
507 return DatabaseSQLite
._subscriptionDataToNative(subscription
);
509 this.logger
.error(_scope
, 'failed', { error: e
, callback
, topicId
});
515 subscriptionGetById(dbCtx
, subscriptionId
) {
516 const _scope
= _fileScope('subscriptionGetById');
517 this.logger
.debug(_scope
, 'called', { subscriptionId
});
521 subscription
= this.statement
.subscriptionGetById
.get({ subscriptionId
});
522 return DatabaseSQLite
._subscriptionDataToNative(subscription
);
524 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionId
});
530 subscriptionUpdate(dbCtx
, data
) {
531 const _scope
= _fileScope('subscriptionUpdate');
532 this.logger
.debug(_scope
, 'called', { data
});
534 const subscriptionData
= {
538 this._subscriptionUpdateDataValidate(subscriptionData
);
541 const result
= this.statement
.subscriptionUpdate
.run(subscriptionData
);
542 if (result
.changes
!= 1) {
543 throw new DBErrors
.UnexpectedResult('did not update subscription');
546 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
552 subscriptionUpsert(dbCtx
, data
) {
553 const _scope
= _fileScope('subscriptionUpsert');
554 this.logger
.debug(_scope
, 'called', { ...data
});
556 const subscriptionData
= {
558 httpRemoteAddr: null,
562 this._subscriptionUpsertDataValidate(subscriptionData
);
566 result
= this.statement
.subscriptionUpsert
.run(subscriptionData
);
567 if (result
.changes
!= 1) {
568 throw new DBErrors
.UnexpectedResult('did not upsert subscription');
570 return this._engineInfo(result
);
572 this.logger
.error(_scope
, 'failed', { error: e
, subscriptionData
});
578 topicDeleted(dbCtx
, topicId
) {
579 const _scope
= _fileScope('topicDeleted');
580 this.logger
.debug(_scope
, 'called', { topicId
});
584 result
= this.statement
.topicDeleted
.run({ topicId
});
585 if (result
.changes
!= 1) {
586 throw new DBErrors
.UnexpectedResult('did not update topic as deleted');
589 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
595 topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
596 const _scope
= _fileScope('topicFetchClaim');
597 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
601 this.db
.transaction(() => {
602 topicIds
= this.statement
.topicContentFetchNeeded
.all({ wanted
}).map((claim
) => claim
.id
);
603 topicIds
.forEach((topicId
) => {
604 const result
= this.statement
.topicContentFetchClaimById
.run({ topicId
, claimTimeoutSeconds
, claimant
});
605 if (result
.changes
!= 1) {
606 throw new DBErrors
.UnexpectedResult('did not claim topic fetch');
612 this.logger
.error(_scope
, 'failed to claim topics for fetch', { error: e
, wanted
, claimTimeoutSeconds
, claimant
, topicIds
});
618 topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
619 const _scope
= _fileScope('topicFetchClaimById');
620 this.logger
.debug(_scope
, 'called', { topicId
, claimTimeoutSeconds
, claimant
});
624 result
= this.statement
.topicContentFetchClaimById
.run({ topicId
, claimTimeoutSeconds
, claimant
});
625 if (result
.changes
!= 1) {
626 throw new DBErrors
.UnexpectedResult('did not claim topic fetch');
628 return this._engineInfo(result
);
630 this.logger
.error(_scope
, 'failed to claim topics for fetch', { error: e
, topicId
, claimTimeoutSeconds
, claimant
});
636 topicFetchComplete(dbCtx
, topicId
) {
637 const _scope
= _fileScope('topicFetchComplete');
638 this.logger
.debug(_scope
, 'called', { topicId
});
642 this.db
.transaction(() => {
643 result
= this.statement
.topicAttemptsReset
.run({ topicId
, forever: EPOCH_FOREVER
});
644 if (result
.changes
!= 1) {
645 throw new DBErrors
.UnexpectedResult('did not reset topic attempts');
647 result
= this.statement
.topicContentFetchDone
.run({ topicId
});
648 if (result
.changes
!= 1) {
649 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
652 return this._engineInfo(result
);
654 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
660 topicFetchIncomplete(dbCtx
, topicId
, retryDelays
= [60]) {
661 const _scope
= _fileScope('topicFetchIncomplete');
662 this.logger
.debug(_scope
, 'called', { topicId
});
666 this.db
.transaction(() => {
667 const { contentFetchAttemptsSinceSuccess: currentAttempt
} = this.statement
.topicAttempts
.get({ topicId
});
668 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
669 result
= this.statement
.topicAttemptsIncrement
.run({ topicId
, nextAttemptDelaySeconds
});
670 if (result
.changes
!= 1) {
671 throw new DBErrors
.UnexpectedResult('did not set topic attempts');
673 result
= this.statement
.topicContentFetchDone
.run({ topicId
});
674 if (result
.changes
!= 1) {
675 throw new DBErrors
.UnexpectedResult('did not release topic fetch');
679 return this._engineInfo(result
);
681 this.logger
.error(_scope
, 'failed', { error: e
, result
, topicId
});
687 topicFetchRequested(dbCtx
, topicId
) {
688 const _scope
= _fileScope('topicFetchRequested');
689 this.logger
.debug(_scope
, 'called', { topicId
});
693 result
= this.statement
.topicContentFetchRequested
.run({ topicId
});
694 if (result
.changes
!= 1) {
695 throw new DBErrors
.UnexpectedResult('did not set topic fetch requested');
697 return this._engineInfo(result
);
699 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
706 * Converts engine topic fields to native types.
707 * @param {Object} data
709 static _topicDataToNative(data
) {
711 data
.isActive
= !!data
.isActive
;
712 data
.isDeleted
= !!data
.isDeleted
;
713 ['created', 'lastPublish', 'contentFetchNextAttempt', 'contentUpdated'].forEach((field
) => {
714 // eslint-disable-next-line security/detect-object-injection
715 data
[field
] = epochToDate(data
[field
]);
722 // eslint-disable-next-line no-unused-vars
724 const _scope
= _fileScope('topicGetAll');
725 this.logger
.debug(_scope
, 'called');
729 topics
= this.statement
.topicGetInfoAll
.all();
731 this.logger
.error(_scope
, 'failed', { error: e
, topics
});
736 .map(DatabaseSQLite
._topicDataToNative
)
737 .map(this._topicDefaults
.bind(this));
743 topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
744 const _scope
= _fileScope('topicGetById');
745 this.logger
.debug(_scope
, 'called', { topicId
});
749 topic
= this.statement
.topicGetById
.get({ topicId
});
750 DatabaseSQLite
._topicDataToNative(topic
);
752 topic
= this._topicDefaults(topic
);
756 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
762 topicGetByUrl(dbCtx
, topicUrl
) {
763 const _scope
= _fileScope('topicGetByUrl');
764 this.logger
.debug(_scope
, 'called', { topicUrl
});
768 topic
= this.statement
.topicGetByUrl
.get({ topicUrl
});
769 DatabaseSQLite
._topicDataToNative(topic
);
770 return this._topicDefaults(topic
);
772 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicUrl
});
778 topicGetContentById(dbCtx
, topicId
) {
779 const _scope
= _fileScope('topicGetContentById');
780 this.logger
.debug(_scope
, 'called', { topicId
});
784 topic
= this.statement
.topicGetContentById
.get({ topicId
});
785 DatabaseSQLite
._topicDataToNative(topic
);
786 return this._topicDefaults(topic
);
788 this.logger
.error(_scope
, 'failed', { error: e
, topic
, topicId
});
794 topicPendingDelete(dbCtx
, topicId
) {
795 const _scope
= _fileScope('topicPendingDelete');
796 this.logger
.debug(_scope
, 'called', { topicId
});
799 this.db
.transaction(() => {
800 const topic
= this.statement
.topicGetById
.get({ topicId
});
801 if (!topic
.isDeleted
) {
802 this.logger
.debug(_scope
, 'topic not set deleted, not deleting', { topicId
});
806 const { count: subscriberCount
} = this.statement
.subscriptionCountByTopicUrl
.get({ topicUrl: topic
.url
});
807 if (subscriberCount
) {
808 this.logger
.debug(_scope
, 'topic has subscribers, not deleting', { topicId
, subscriberCount
});
812 const result
= this.statement
.topicDeleteById
.run({ topicId
});
813 if (result
.changes
!== 1) {
814 throw new DBErrors
.UnexpectedResult('did not delete topic');
817 this.logger
.debug(_scope
, 'success', { topicId
});
819 this.logger
.error(_scope
, 'failed', { error: e
, topicId
});
825 topicPublishHistory(dbCtx
, topicId
, days
) {
826 const _scope
= _fileScope('topicPublishHistory');
827 this.logger
.debug(_scope
, 'called', { topicId
, days
})
829 const events
= this.statement
.topicPublishHistory
.all({ topicId
, daysAgo: days
});
830 const history
= Array
.from({ length: days
}, () => 0);
831 events
.forEach(({ daysAgo
, contentUpdates
}) => history
[daysAgo
] = Number(contentUpdates
));
837 topicSet(dbCtx
, data
) {
838 const _scope
= _fileScope('topicSet');
839 this.logger
.debug(_scope
, 'called', data
);
841 const topicSetData
= {
842 publisherValidationUrl: null,
843 leaseSecondsPreferred: null,
844 leaseSecondsMin: null,
845 leaseSecondsMax: null,
851 this._topicSetDataValidate(topicSetData
);
852 result
= this.statement
.topicUpsert
.run(topicSetData
);
853 if (result
.changes
!= 1) {
854 throw new DBErrors
.UnexpectedResult('did not set topic data');
856 return this._engineInfo(result
);
858 this.logger
.error(_scope
, 'failed', { error: e
, result
});
864 topicSetContent(dbCtx
, data
) {
865 const _scope
= _fileScope('topicSetContent');
866 const topicSetContentData
= {
869 httpLastModified: null,
873 ...topicSetContentData
,
874 content: common
.logTruncate(topicSetContentData
.content
, 100),
876 this.logger
.debug(_scope
, 'called', logData
);
880 this._topicSetContentDataValidate(topicSetContentData
);
881 result
= this.statement
.topicSetContent
.run(topicSetContentData
);
882 logData
.result
= result
;
883 if (result
.changes
!= 1) {
884 throw new DBErrors
.UnexpectedResult('did not set topic content');
886 result
= this.statement
.topicSetContentHistory
.run({
887 topicId: data
.topicId
,
888 contentHash: data
.contentHash
,
889 contentSize: data
.content
.length
,
891 if (result
.changes
!= 1) {
892 throw new DBErrors
.UnexpectedResult('did not set topic content history');
894 return this._engineInfo(result
);
896 this.logger
.error(_scope
, 'failed', { error: e
, ...logData
});
902 topicUpdate(dbCtx
, data
) {
903 const _scope
= _fileScope('topicUpdate');
904 this.logger
.debug(_scope
, 'called', { data
});
907 leaseSecondsPreferred: null,
908 leaseSecondsMin: null,
909 leaseSecondsMax: null,
910 publisherValidationUrl: null,
914 this._topicUpdateDataValidate(topicData
);
917 const result
= this.statement
.topicUpdate
.run(topicData
);
918 if (result
.changes
!= 1) {
919 throw new DBErrors
.UnexpectedResult('did not update topic');
922 this.logger
.error(_scope
, 'failed', { error: e
, topicData
});
928 verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
929 const _scope
= _fileScope('verificationClaim');
930 this.logger
.debug(_scope
, 'called', { wanted
, claimTimeoutSeconds
});
934 this.db
.transaction(() => {
935 verificationIds
= this.statement
.verificationNeeded
.all({ wanted
}).map((claim
) => claim
.id
);
936 verificationIds
.forEach((verificationId
) => {
937 const result
= this.statement
.verificationClaimById
.run({ verificationId
, claimTimeoutSeconds
, claimant
});
938 if (result
.changes
!= 1) {
939 throw new DBErrors
.UnexpectedResult('did not claim verification');
943 return verificationIds
;
945 this.logger
.error(_scope
, 'failed to claim verifications', { wanted
, claimTimeoutSeconds
});
951 verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
952 const _scope
= _fileScope('verificationClaimById');
953 this.logger
.debug(_scope
, 'called', { verificationId
, claimTimeoutSeconds
, claimant
});
957 result
= this.statement
.verificationClaimById
.run({ verificationId
, claimTimeoutSeconds
, claimant
});
958 if (result
.changes
!= 1) {
959 throw new DBErrors
.UnexpectedResult('did not claim verification');
961 return this._engineInfo(result
);
963 this.logger
.error(_scope
, 'failed to claim verification', { error: e
, verificationId
, claimTimeoutSeconds
, claimant
});
969 verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
970 const _scope
= _fileScope('verificationComplete');
971 this.logger
.debug(_scope
, 'called', { verificationId
});
975 this.db
.transaction(() => {
976 result
= this.statement
.verificationScrub
.run({ verificationId
, callback
, topicId
});
977 if (result
.changes
< 1) {
978 throw new DBErrors
.UnexpectedResult('did not remove verifications');
982 this.logger
.error(_scope
, 'failed', { verificationId
});
985 return this._engineInfo(result
);
990 * Converts engine verification fields to native types.
991 * @param {Object} data
993 static _verificationDataToNative(data
) {
995 data
.isPublisherValidated
= !!data
.isPublisherValidated
;
1000 verificationGetById(dbCtx
, verificationId
) {
1001 const _scope
= _fileScope('verificationGetById');
1002 this.logger
.debug(_scope
, 'called', { verificationId
});
1006 verification
= this.statement
.verificationGetById
.get({ verificationId
});
1007 DatabaseSQLite
._verificationDataToNative(verification
);
1008 return verification
;
1010 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1016 verificationIncomplete(dbCtx
, verificationId
, retryDelays
= [60]) {
1017 const _scope
= _fileScope('verificationIncomplete');
1018 this.logger
.debug(_scope
, 'called', { verificationId
});
1022 this.db
.transaction(() => {
1023 const { attempts: currentAttempt
} = this.statement
.verificationAttempts
.get({ verificationId
});
1024 const nextAttemptDelaySeconds
= common
.attemptRetrySeconds(currentAttempt
, retryDelays
);
1025 result
= this.statement
.verificationAttemptsIncrement
.run({ verificationId
, nextAttemptDelaySeconds
});
1026 if (result
.changes
!= 1) {
1027 throw new DBErrors
.UnexpectedResult('did not increment verification attempts');
1029 result
= this.statement
.verificationDone
.run({ verificationId
});
1030 if (result
.changes
!= 1) {
1031 throw new DBErrors
.UnexpectedResult('did not release verification in progress');
1035 return this._engineInfo(result
);
1037 this.logger
.error(_scope
, 'failed', { error: e
, result
, verificationId
});
1044 * Convert native verification fields to engine types.
1046 static _verificationDataToEngine(data
) {
1048 data
.isPublisherValidated
= data
.isPublisherValidated
? 1 : 0;
1053 verificationInsert(dbCtx
, verification
) {
1054 const _scope
= _fileScope('verificationInsert');
1055 this.logger
.debug(_scope
, 'called', { verification
});
1057 const verificationData
= {
1059 httpRemoteAddr: null,
1065 let result
, verificationId
;
1067 this._verificationDataValidate(verificationData
);
1068 DatabaseSQLite
._verificationDataToEngine(verificationData
);
1069 result
= this.statement
.verificationInsert
.run(verificationData
);
1070 if (result
.changes
!= 1) {
1071 throw new DBErrors
.UnexpectedResult('did not insert verification');
1073 verificationId
= result
.lastInsertRowid
;
1074 this.logger
.debug(_scope
, 'inserted verification', { verificationId
});
1076 return verificationId
;
1078 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
1084 verificationRelease(dbCtx
, verificationId
) {
1085 const _scope
= _fileScope('verificationRelease');
1086 this.logger
.debug(_scope
, 'called', { verificationId
});
1090 result
= this.statement
.verificationDone
.run({ verificationId
});
1091 if (result
.changes
!= 1) {
1092 throw new DBErrors
.UnexpectedResult('did not release verification');
1095 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1101 verificationUpdate(dbCtx
, verificationId
, data
) {
1102 const _scope
= _fileScope('verificationUpdate');
1103 this.logger
.debug(_scope
, 'called', { verificationId
, data
});
1105 const verificationData
= {
1113 this._verificationUpdateDataValidate(verificationData
);
1114 DatabaseSQLite
._verificationDataToEngine(verificationData
);
1115 result
= this.statement
.verificationUpdate
.run(verificationData
);
1116 if (result
.changes
!= 1) {
1117 throw new DBErrors
.UnexpectedResult('did not update verification');
1120 this.logger
.error(_scope
, 'failed', { error: e
, verificationData
});
1126 verificationValidated(dbCtx
, verificationId
) {
1127 const _scope
= _fileScope('verificationValidated');
1128 this.logger
.debug(_scope
, 'called', { verificationId
});
1132 result
= this.statement
.verificationValidate
.run({ verificationId
});
1133 if (result
.changes
!= 1) {
1134 throw new DBErrors
.UnexpectedResult('did not set verification validation');
1137 this.logger
.error(_scope
, 'failed', { error: e
, verificationId
});
1145 module
.exports
= DatabaseSQLite
;