fba4e7ca326f828a6f9698258d2ec4628fb7665e
[websub-hub] / src / db / sqlite / index.js
1 'use strict';
2
3 const common = require('../../common');
4 const Database = require('../base');
5 const DBErrors = require('../errors');
6 const svh = require('../schema-version-helper');
7 const SQLite = require('better-sqlite3');
8 const fs = require('fs');
9 const path = require('path');
10 const { performance } = require('perf_hooks');
11
12 const _fileScope = common.fileScope(__filename);
13
14 const schemaVersionsSupported = {
15 min: {
16 major: 1,
17 minor: 0,
18 patch: 0,
19 },
20 max: {
21 major: 1,
22 minor: 0,
23 patch: 1,
24 },
25 };
26
27 // max of signed int64 (2^63 - 1), should be enough
28 const EPOCH_FOREVER = BigInt('9223372036854775807');
29
30 class DatabaseSQLite extends Database {
31 constructor(logger, options) {
32 super(logger, options);
33
34 const connectionString = options.db.connectionString || 'sqlite://:memory:';
35 const csDelim = '://';
36 const dbFilename = connectionString.slice(connectionString.indexOf(csDelim) + csDelim.length);
37
38 const queryLogLevel = options.db.queryLogLevel;
39
40 const sqliteOptions = {
41 ...(queryLogLevel && {
42 // eslint-disable-next-line security/detect-object-injection
43 verbose: (query) => this.logger[queryLogLevel](_fileScope('SQLite:verbose'), '', { query }),
44 }),
45 };
46 this.db = new SQLite(dbFilename, sqliteOptions);
47 this.schemaVersionsSupported = schemaVersionsSupported;
48 this.changesSinceLastOptimize = BigInt(0);
49 this.optimizeAfterChanges = options.db.connectionString.optimizeAfterChanges;
50 this.db.pragma('foreign_keys = on'); // Enforce consistency.
51 this.db.pragma('journal_mode = WAL'); // Be faster, expect local filesystem.
52 this.db.defaultSafeIntegers(true); // This probably isn't necessary, but by using these BigInts we keep weird floats out of the query logs.
53
54 this._initTables();
55 this._initStatements();
56 }
57
58
59 /**
60 * SQLite cannot prepare its statements without a schema, ensure such exists.
61 */
62 _initTables() {
63 const _scope = _fileScope('_initTables');
64
65 // Migrations rely upon this table, ensure it exists.
66 const metaVersionTable = '_meta_schema_version';
67 const tableExists = this.db.prepare('SELECT name FROM sqlite_master WHERE type=:type AND name=:name').pluck(true).bind({ type: 'table', name: metaVersionTable });
68 let metaExists = tableExists.get();
69 if (metaExists === undefined) {
70 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
71 // eslint-disable-next-line security/detect-non-literal-fs-filename
72 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
73 this.db.exec(fSql);
74 metaExists = tableExists.get();
75 /* istanbul ignore if */
76 if (metaExists === undefined) {
77 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
78 }
79 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
80 }
81
82 // Apply migrations
83 const currentSchema = this._currentSchema();
84 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
85 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
86 migrationsWanted.forEach((v) => {
87 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
88 // eslint-disable-next-line security/detect-non-literal-fs-filename
89 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
90 this.logger.info(_scope, 'applying migration', { version: v });
91 this.db.exec(fSql);
92 });
93 }
94
95
96 _initStatements() {
97 const _scope = _fileScope('_initStatements');
98 const sqlDir = path.join(__dirname, 'sql');
99 this.statement = {};
100
101 // Decorate the statement calls we use with timing and logging.
102 const wrapFetch = (logName, statementName, fn) => {
103 const _wrapScope = _fileScope(logName);
104 return (...args) => {
105 const startTimestampMs = performance.now();
106 const rows = fn(...args);
107 DatabaseSQLite._deOphidiate(rows);
108 const elapsedTimeMs = performance.now() - startTimestampMs;
109 this.logger.debug(_wrapScope, 'complete', { statementName, elapsedTimeMs });
110 return rows;
111 };
112 };
113 const wrapRun = (logName, statementName, fn) => {
114 const _wrapScope = _fileScope(logName);
115 return (...args) => {
116 const startTimestampMs = performance.now();
117 const result = fn(...args);
118 const elapsedTimeMs = performance.now() - startTimestampMs;
119 this.logger.debug(_wrapScope, 'complete', { ...result, statementName, elapsedTimeMs });
120 result.duration = elapsedTimeMs;
121 return result;
122 };
123 };
124
125 // eslint-disable-next-line security/detect-non-literal-fs-filename
126 for (const f of fs.readdirSync(sqlDir)) {
127 const fPath = path.join(sqlDir, f);
128 const { name: fName, ext: fExt } = path.parse(f);
129 // eslint-disable-next-line security/detect-non-literal-fs-filename
130 const stat = fs.statSync(fPath);
131 if (!stat.isFile()
132 || fExt.toLowerCase() !== '.sql') {
133 continue;
134 }
135 // eslint-disable-next-line security/detect-non-literal-fs-filename
136 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
137 const statementName = Database._camelfy(fName.toLowerCase(), '-');
138 let statement;
139 try {
140 statement = this.db.prepare(fSql);
141 } catch (e) {
142 /* istanbul ignore next */
143 this.logger.error(_scope, 'failed to prepare statement', { error: e, file: f });
144 /* istanbul ignore next */
145 throw e;
146 }
147 // eslint-disable-next-line security/detect-object-injection
148 this.statement[statementName] = statement;
149 const { get: origGet, all: origAll, run: origRun } = statement;
150 statement.get = wrapFetch('SQLite:get', statementName, origGet.bind(statement));
151 statement.all = wrapFetch('SQLite:all', statementName, origAll.bind(statement));
152 statement.run = wrapRun('SQLite:run', statementName, origRun.bind(statement));
153 }
154 this.statement._optimize = this.db.prepare('SELECT * FROM pragma_optimize(0x03)');
155
156 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
157 }
158
159
160 static _deOphidiate(rows) {
161 const rowsIsArray = Array.isArray(rows);
162 if (!rowsIsArray) {
163 rows = [rows];
164 }
165 const exemplaryRow = rows[0];
166 for (const prop in exemplaryRow) {
167 const camel = Database._camelfy(prop);
168 if (!(camel in exemplaryRow)) {
169 for (const d of rows) {
170 // eslint-disable-next-line security/detect-object-injection
171 d[camel] = d[prop];
172 // eslint-disable-next-line security/detect-object-injection
173 delete d[prop];
174 }
175 }
176 }
177 return rowsIsArray ? rows : rows[0];
178 }
179
180
181 _currentSchema() {
182 return this.db.prepare('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1').get();
183 }
184
185
186 healthCheck() {
187 const _scope = _fileScope('healthCheck');
188 this.logger.debug(_scope, 'called', {});
189 if (!this.db.open) {
190 throw new DBErrors.UnexpectedResult('database is not open');
191 }
192 return { open: this.db.open };
193 }
194
195
196 _engineInfo(result) {
197 if (result.changes) {
198 this.changesSinceLastOptimize += BigInt(result.changes);
199 this._optimize();
200 }
201 return {
202 changes: Number(result.changes),
203 lastInsertRowid: result.lastInsertRowid,
204 };
205 }
206
207
208 _closeConnection() {
209 this.db.close();
210 }
211
212
213 _optimize() {
214 const _scope = _fileScope('_optimize');
215
216 if (this.optimizeAfterChanges
217 && this.changesSinceLastOptimize >= this.optimizeAfterChanges) {
218 const optimize = this.statement._optimize.all();
219 this.logger.debug(_scope, 'optimize', { optimize });
220 this.db.pragma('optimize');
221 this.changesSinceLastOptimize = BigInt(0);
222 }
223 }
224
225
226 _purgeTables(really) {
227 if (really) {
228 [
229 'topic',
230 'topic_fetch_in_progress',
231 'verification',
232 'verification_in_progress',
233 'subscription',
234 'subscription_delivery_in_progress',
235 ].map((table) => {
236 const result = this.db.prepare(`DELETE FROM ${table}`).run();
237 this.logger.debug(_fileScope('_purgeTables'), 'success', { table, result });
238 });
239 }
240 }
241
242
243 context(fn) {
244 return fn(this.db);
245 }
246
247
248 transaction(dbCtx, fn) {
249 dbCtx = dbCtx || this.db;
250 return dbCtx.transaction(fn)();
251 }
252
253
254 authenticationSuccess(dbCtx, identifier) {
255 const _scope = _fileScope('authenticationSuccess');
256 this.logger.debug(_scope, 'called', { identifier });
257
258 let result;
259 try {
260 result = this.statement.authenticationSuccess.run({ identifier });
261 if (result.changes != 1) {
262 throw new DBErrors.UnexpectedResult('did not update authentication success');
263 }
264 } catch (e) {
265 this.logger.error(_scope, 'failed', { error: e, identifier });
266 throw e;
267 }
268 }
269
270
271 authenticationGet(dbCtx, identifier) {
272 const _scope = _fileScope('authenticationGet');
273 this.logger.debug(_scope, 'called', { identifier });
274
275 try {
276 return this.statement.authenticationGet.get({ identifier });
277 } catch (e) {
278 this.logger.error(_scope, 'failed', { error: e, identifier });
279 throw e;
280 }
281 }
282
283
284 authenticationUpsert(dbCtx, identifier, credential) {
285 const _scope = _fileScope('authenticationUpsert');
286 const scrubbedCredential = '*'.repeat((credential || '').length);
287 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
288
289 let result;
290 try {
291 result = this.statement.authenticationUpsert.run({ identifier, credential });
292 if (result.changes != 1) {
293 throw new DBErrors.UnexpectedResult('did not upsert authentication');
294 }
295 } catch (e) {
296 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
297 throw e;
298 }
299 }
300
301
302 /**
303 * Converts engine subscription fields to native types.
304 * @param {Object} data
305 */
306 static _subscriptionDataToNative(data) {
307 const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
308 if (data) {
309 ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
310 // eslint-disable-next-line security/detect-object-injection
311 data[field] = epochToDate(data[field]);
312 });
313 }
314 return data;
315 }
316
317
318 subscriptionsByTopicId(dbCtx, topicId) {
319 const _scope = _fileScope('subscriptionsByTopicId');
320 this.logger.debug(_scope, 'called', { topicId });
321
322 try {
323 const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
324 return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
325 } catch (e) {
326 this.logger.error(_scope, 'failed', { error: e, topicId });
327 throw e;
328 }
329 }
330
331
332 subscriptionCountByTopicUrl(dbCtx, topicUrl) {
333 const _scope = _fileScope('subscriptionCountByTopicUrl');
334 this.logger.debug(_scope, 'called', { topicUrl });
335
336 try {
337 return this.statement.subscriptionCountByTopicUrl.get({ topicUrl });
338 } catch (e) {
339 this.logger.error(_scope, 'failed', { error: e, topicUrl });
340 throw e;
341 }
342 }
343
344
345 subscriptionDelete(dbCtx, callback, topicId) {
346 const _scope = _fileScope('subscriptionDelete');
347 this.logger.debug(_scope, 'called', { callback, topicId });
348
349 try {
350 const result = this.statement.subscriptionDelete.run({ callback, topicId });
351 if (result.changes != 1) {
352 throw new DBErrors.UnexpectedResult('did not delete subscription');
353 }
354 return this._engineInfo(result);
355 } catch (e) {
356 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
357 throw e;
358 }
359 }
360
361
362 subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
363 const _scope = _fileScope('subscriptionDeliveryClaim');
364 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
365
366 let subscriptionIds;
367 try {
368 this.db.transaction(() => {
369 subscriptionIds = this.statement.subscriptionDeliveryNeeded.all({ wanted }).map((claim) => claim.id);
370 subscriptionIds.forEach((subscriptionId) => {
371 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
372 if (result.changes != 1) {
373 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
374 }
375 });
376 })();
377 return subscriptionIds;
378 } catch (e) {
379 this.logger.error(_scope, 'failed', { error: e, wanted, claimTimeoutSeconds, claimant, subscriptionIds });
380 throw e;
381 }
382 }
383
384
385 subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
386 const _scope = _fileScope('subscriptionDeliveryClaimById');
387 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
388
389 try {
390 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
391 if (result.changes != 1) {
392 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
393 }
394 return this._engineInfo(result);
395 } catch (e) {
396 this.logger.error(_scope, 'failed', { error: e, subscriptionId, claimTimeoutSeconds, claimant });
397 throw e;
398 }
399 }
400
401
402 subscriptionDeliveryComplete(dbCtx, callback, topicId) {
403 const _scope = _fileScope('subscriptionDeliveryComplete');
404 this.logger.debug(_scope, 'called', { callback, topicId });
405
406 let result;
407 try {
408 this.db.transaction(() => {
409 result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId });
410 if (result.changes != 1) {
411 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
412 }
413 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
414 if (result.changes != 1) {
415 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
416 }
417 })();
418 return this._engineInfo(result);
419 } catch (e) {
420 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
421 throw e;
422 }
423 }
424
425
426 subscriptionDeliveryGone(dbCtx, callback, topicId) {
427 const _scope = _fileScope('subscriptionDeliveryGone');
428 this.logger.debug(_scope, 'called', { callback, topicId });
429
430 let result;
431 try {
432 this.db.transaction(() => {
433 result = this.statement.subscriptionDelete.run({ callback, topicId });
434 if (result.changes != 1) {
435 throw new DBErrors.UnexpectedResult('did not delete subscription');
436 }
437 // Delete cascades to delivery
438 // result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
439 // if (result.changes != 1) {
440 // throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
441 // }
442 })();
443 return this._engineInfo(result);
444 } catch (e) {
445 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
446 throw e;
447 }
448 }
449
450
451 subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
452 const _scope = _fileScope('subscriptionDeliveryIncomplete');
453 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
454
455 let result;
456 try {
457 this.db.transaction(() => {
458 const { currentAttempt } = this.statement.subscriptionDeliveryAttempts.get({ callback, topicId });
459 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
460 result = this.statement.subscriptionDeliveryFailure.run({ nextAttemptDelaySeconds, callback, topicId });
461 if (result.changes != 1) {
462 throw new DBErrors.UnexpectedResult('did not set delivery failure');
463 }
464 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
465 if (result.changes != 1) {
466 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
467 }
468 })();
469 } catch (e) {
470 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
471 throw e;
472 }
473 }
474
475
476 subscriptionGet(dbCtx, callback, topicId) {
477 const _scope = _fileScope('subscriptionGet');
478 this.logger.debug(_scope, 'called', { callback, topicId });
479
480 let subscription;
481 try {
482 subscription = this.statement.subscriptionGet.get({ callback, topicId });
483 return DatabaseSQLite._subscriptionDataToNative(subscription);
484 } catch (e) {
485 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
486 throw e;
487 }
488 }
489
490
491 subscriptionGetById(dbCtx, subscriptionId) {
492 const _scope = _fileScope('subscriptionGetById');
493 this.logger.debug(_scope, 'called', { subscriptionId });
494
495 let subscription;
496 try {
497 subscription = this.statement.subscriptionGetById.get({ subscriptionId });
498 return DatabaseSQLite._subscriptionDataToNative(subscription);
499 } catch (e) {
500 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
501 throw e;
502 }
503 }
504
505
506 subscriptionUpdate(dbCtx, data) {
507 const _scope = _fileScope('subscriptionUpdate');
508 this.logger.debug(_scope, 'called', { data });
509
510 const subscriptionData = {
511 ...data,
512 };
513
514 this._subscriptionUpdateDataValidate(subscriptionData);
515
516 try {
517 const result = this.statement.subscriptionUpdate.run(subscriptionData);
518 if (result.changes != 1) {
519 throw new DBErrors.UnexpectedResult('did not update subscription');
520 }
521 } catch (e) {
522 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
523 throw e;
524 }
525 }
526
527
528 subscriptionUpsert(dbCtx, data) {
529 const _scope = _fileScope('subscriptionUpsert');
530 this.logger.debug(_scope, 'called', { ...data });
531
532 const subscriptionData = {
533 secret: null,
534 httpRemoteAddr: null,
535 httpFrom: null,
536 ...data,
537 }
538 this._subscriptionUpsertDataValidate(subscriptionData);
539
540 let result;
541 try {
542 result = this.statement.subscriptionUpsert.run(subscriptionData);
543 if (result.changes != 1) {
544 throw new DBErrors.UnexpectedResult('did not upsert subscription');
545 }
546 return this._engineInfo(result);
547 } catch (e) {
548 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
549 throw e;
550 }
551 }
552
553
554 topicDeleted(dbCtx, topicId) {
555 const _scope = _fileScope('topicDeleted');
556 this.logger.debug(_scope, 'called', { topicId });
557
558 let result;
559 try {
560 result = this.statement.topicDeleted.run({ topicId });
561 if (result.changes != 1) {
562 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
563 }
564 } catch (e) {
565 this.logger.error(_scope, 'failed', { error: e, topicId });
566 throw e;
567 }
568 }
569
570
571 topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
572 const _scope = _fileScope('topicFetchClaim');
573 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
574
575 let topicIds;
576 try {
577 this.db.transaction(() => {
578 topicIds = this.statement.topicContentFetchNeeded.all({ wanted }).map((claim) => claim.id);
579 topicIds.forEach((topicId) => {
580 const result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
581 if (result.changes != 1) {
582 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
583 }
584 });
585 })();
586 return topicIds;
587 } catch (e) {
588 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, wanted, claimTimeoutSeconds, claimant, topicIds });
589 throw e;
590 }
591 }
592
593
594 topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
595 const _scope = _fileScope('topicFetchClaimById');
596 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
597
598 let result;
599 try {
600 result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
601 if (result.changes != 1) {
602 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
603 }
604 return this._engineInfo(result);
605 } catch (e) {
606 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, topicId, claimTimeoutSeconds, claimant });
607 throw e;
608 }
609 }
610
611
612 topicFetchComplete(dbCtx, topicId) {
613 const _scope = _fileScope('topicFetchComplete');
614 this.logger.debug(_scope, 'called', { topicId });
615
616 let result;
617 try {
618 this.db.transaction(() => {
619 result = this.statement.topicAttemptsReset.run({ topicId, forever: EPOCH_FOREVER });
620 if (result.changes != 1) {
621 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
622 }
623 result = this.statement.topicContentFetchDone.run({ topicId });
624 if (result.changes != 1) {
625 throw new DBErrors.UnexpectedResult('did not release topic fetch');
626 }
627 })();
628 return this._engineInfo(result);
629 } catch (e) {
630 this.logger.error(_scope, 'failed', { error: e, result, topicId });
631 throw e;
632 }
633 }
634
635
636 topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
637 const _scope = _fileScope('topicFetchIncomplete');
638 this.logger.debug(_scope, 'called', { topicId });
639
640 let result;
641 try {
642 this.db.transaction(() => {
643 const { contentFetchAttemptsSinceSuccess: currentAttempt } = this.statement.topicAttempts.get({ topicId });
644 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
645 result = this.statement.topicAttemptsIncrement.run({ topicId, nextAttemptDelaySeconds });
646 if (result.changes != 1) {
647 throw new DBErrors.UnexpectedResult('did not set topic attempts');
648 }
649 result = this.statement.topicContentFetchDone.run({ topicId });
650 if (result.changes != 1) {
651 throw new DBErrors.UnexpectedResult('did not release topic fetch');
652 }
653 return result;
654 })();
655 return this._engineInfo(result);
656 } catch (e) {
657 this.logger.error(_scope, 'failed', { error: e, result, topicId });
658 throw e;
659 }
660 }
661
662
663 topicFetchRequested(dbCtx, topicId) {
664 const _scope = _fileScope('topicFetchRequested');
665 this.logger.debug(_scope, 'called', { topicId });
666
667 let result;
668 try {
669 result = this.statement.topicContentFetchRequested.run({ topicId });
670 if (result.changes != 1) {
671 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
672 }
673 return this._engineInfo(result);
674 } catch (e) {
675 this.logger.error(_scope, 'failed', { error: e, topicId });
676 throw e;
677 }
678 }
679
680
681 /**
682 * Converts engine topic fields to native types.
683 * @param {Object} data
684 */
685 static _topicDataToNative(data) {
686 const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
687 if (data) {
688 data.isActive = !!data.isActive;
689 data.isDeleted = !!data.isDeleted;
690 ['created', 'lastPublish', 'contentFetchNextAttempt', 'contentUpdated'].forEach((field) => {
691 // eslint-disable-next-line security/detect-object-injection
692 data[field] = epochToDate(data[field]);
693 });
694 }
695 return data;
696 }
697
698
699 // eslint-disable-next-line no-unused-vars
700 topicGetAll(dbCtx) {
701 const _scope = _fileScope('topicGetAll');
702 this.logger.debug(_scope, 'called');
703
704 let topics;
705 try {
706 topics = this.statement.topicGetInfoAll.all();
707 } catch (e) {
708 this.logger.error(_scope, 'failed', { error: e, topics });
709 throw e;
710 }
711 if (topics) {
712 topics = topics
713 .map(DatabaseSQLite._topicDataToNative)
714 .map(this._topicDefaults.bind(this));
715 }
716 return topics;
717 }
718
719
720 topicGetById(dbCtx, topicId, applyDefaults = true) {
721 const _scope = _fileScope('topicGetById');
722 this.logger.debug(_scope, 'called', { topicId });
723
724 let topic;
725 try {
726 topic = this.statement.topicGetById.get({ topicId });
727 DatabaseSQLite._topicDataToNative(topic);
728 if (applyDefaults) {
729 topic = this._topicDefaults(topic);
730 }
731 return topic;
732 } catch (e) {
733 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
734 throw e;
735 }
736 }
737
738
739 topicGetByUrl(dbCtx, topicUrl) {
740 const _scope = _fileScope('topicGetByUrl');
741 this.logger.debug(_scope, 'called', { topicUrl });
742
743 let topic;
744 try {
745 topic = this.statement.topicGetByUrl.get({ topicUrl });
746 DatabaseSQLite._topicDataToNative(topic);
747 return this._topicDefaults(topic);
748 } catch (e) {
749 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
750 throw e;
751 }
752 }
753
754
755 topicGetContentById(dbCtx, topicId) {
756 const _scope = _fileScope('topicGetContentById');
757 this.logger.debug(_scope, 'called', { topicId });
758
759 let topic;
760 try {
761 topic = this.statement.topicGetContentById.get({ topicId });
762 DatabaseSQLite._topicDataToNative(topic);
763 return this._topicDefaults(topic);
764 } catch (e) {
765 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
766 throw e;
767 }
768 }
769
770
771 topicSet(dbCtx, data) {
772 const _scope = _fileScope('topicSet');
773 this.logger.debug(_scope, 'called', data);
774
775 const topicSetData = {
776 publisherValidationUrl: null,
777 leaseSecondsPreferred: null,
778 leaseSecondsMin: null,
779 leaseSecondsMax: null,
780 ...data,
781 };
782
783 let result;
784 try {
785 this._topicSetDataValidate(topicSetData);
786 result = this.statement.topicUpsert.run(topicSetData);
787 if (result.changes != 1) {
788 throw new DBErrors.UnexpectedResult('did not set topic data');
789 }
790 return this._engineInfo(result);
791 } catch (e) {
792 this.logger.error(_scope, 'failed', { error: e, result });
793 throw e;
794 }
795 }
796
797
798 topicSetContent(dbCtx, data) {
799 const _scope = _fileScope('topicSetContent');
800 const topicSetContentData = {
801 contentType: null,
802 ...data,
803 };
804 const logData = {
805 ...topicSetContentData,
806 content: common.logTruncate(topicSetContentData.content, 100),
807 };
808 this.logger.debug(_scope, 'called', logData);
809
810 let result;
811 try {
812 this._topicSetContentDataValidate(topicSetContentData);
813 result = this.statement.topicSetContent.run(topicSetContentData);
814 logData.result = result;
815 if (result.changes != 1) {
816 throw new DBErrors.UnexpectedResult('did not set topic content');
817 }
818 return this._engineInfo(result);
819 } catch (e) {
820 this.logger.error(_scope, 'failed', { error: e, ...logData });
821 throw e;
822 }
823 }
824
825
826 topicUpdate(dbCtx, data) {
827 const _scope = _fileScope('topicUpdate');
828 this.logger.debug(_scope, 'called', { data });
829
830 const topicData = {
831 leaseSecondsPreferred: null,
832 leaseSecondsMin: null,
833 leaseSecondsMax: null,
834 publisherValidationUrl: null,
835 ...data,
836 };
837
838 this._topicUpdateDataValidate(topicData);
839
840 try {
841 const result = this.statement.topicUpdate.run(topicData);
842 if (result.changes != 1) {
843 throw new DBErrors.UnexpectedResult('did not update topic');
844 }
845 } catch (e) {
846 this.logger.error(_scope, 'failed', { error: e, topicData });
847 throw e;
848 }
849 }
850
851
852 verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
853 const _scope = _fileScope('verificationClaim');
854 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
855
856 let verificationIds;
857 try {
858 this.db.transaction(() => {
859 verificationIds = this.statement.verificationNeeded.all({ wanted }).map((claim) => claim.id);
860 verificationIds.forEach((verificationId) => {
861 const result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
862 if (result.changes != 1) {
863 throw new DBErrors.UnexpectedResult('did not claim verification');
864 }
865 });
866 })();
867 return verificationIds;
868 } catch (e) {
869 this.logger.error(_scope, 'failed to claim verifications', { wanted, claimTimeoutSeconds });
870 throw e;
871 }
872 }
873
874
875 verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
876 const _scope = _fileScope('verificationClaimById');
877 this.logger.debug(_scope, 'called', { verificationId, claimTimeoutSeconds, claimant });
878
879 let result;
880 try {
881 result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
882 if (result.changes != 1) {
883 throw new DBErrors.UnexpectedResult('did not claim verification');
884 }
885 return this._engineInfo(result);
886 } catch (e) {
887 this.logger.error(_scope, 'failed to claim verification', { error: e, verificationId, claimTimeoutSeconds, claimant });
888 throw e;
889 }
890 }
891
892
893 verificationComplete(dbCtx, verificationId, callback, topicId) {
894 const _scope = _fileScope('verificationComplete');
895 this.logger.debug(_scope, 'called', { verificationId });
896
897 let result;
898 try {
899 this.db.transaction(() => {
900 result = this.statement.verificationScrub.run({ verificationId, callback, topicId });
901 if (result.changes < 1) {
902 throw new DBErrors.UnexpectedResult('did not remove verifications');
903 }
904 })();
905 } catch (e) {
906 this.logger.error(_scope, 'failed', { verificationId });
907 throw e;
908 }
909 return this._engineInfo(result);
910 }
911
912
913 /**
914 * Converts engine verification fields to native types.
915 * @param {Object} data
916 */
917 static _verificationDataToNative(data) {
918 if (data) {
919 data.isPublisherValidated = !!data.isPublisherValidated;
920 }
921 }
922
923
924 verificationGetById(dbCtx, verificationId) {
925 const _scope = _fileScope('verificationGetById');
926 this.logger.debug(_scope, 'called', { verificationId });
927
928 let verification;
929 try {
930 verification = this.statement.verificationGetById.get({ verificationId });
931 DatabaseSQLite._verificationDataToNative(verification);
932 return verification;
933 } catch (e) {
934 this.logger.error(_scope, 'failed', { error: e, verificationId });
935 throw e;
936 }
937 }
938
939
940 verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
941 const _scope = _fileScope('verificationIncomplete');
942 this.logger.debug(_scope, 'called', { verificationId });
943
944 let result;
945 try {
946 this.db.transaction(() => {
947 const { attempts: currentAttempt } = this.statement.verificationAttempts.get({ verificationId });
948 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
949 result = this.statement.verificationAttemptsIncrement.run({ verificationId, nextAttemptDelaySeconds });
950 if (result.changes != 1) {
951 throw new DBErrors.UnexpectedResult('did not increment verification attempts');
952 }
953 result = this.statement.verificationDone.run({ verificationId });
954 if (result.changes != 1) {
955 throw new DBErrors.UnexpectedResult('did not release verification in progress');
956 }
957 return result;
958 })();
959 return this._engineInfo(result);
960 } catch (e) {
961 this.logger.error(_scope, 'failed', { error: e, result, verificationId });
962 throw e;
963 }
964 }
965
966
967 /**
968 * Convert native verification fields to engine types.
969 */
970 static _verificationDataToEngine(data) {
971 if (data) {
972 data.isPublisherValidated = data.isPublisherValidated ? 1 : 0;
973 }
974 }
975
976
977 verificationInsert(dbCtx, verification) {
978 const _scope = _fileScope('verificationInsert');
979 this.logger.debug(_scope, 'called', { verification });
980
981 const verificationData = {
982 secret: null,
983 httpRemoteAddr: null,
984 httpFrom: null,
985 requestId: null,
986 ...verification,
987 };
988
989 let result, verificationId;
990 try {
991 this._verificationDataValidate(verificationData);
992 DatabaseSQLite._verificationDataToEngine(verificationData);
993 result = this.statement.verificationInsert.run(verificationData);
994 if (result.changes != 1) {
995 throw new DBErrors.UnexpectedResult('did not insert verification');
996 }
997 verificationId = result.lastInsertRowid;
998 this.logger.debug(_scope, 'inserted verification', { verificationId });
999
1000 return verificationId;
1001 } catch (e) {
1002 this.logger.error(_scope, 'failed', { error: e, verificationData });
1003 throw e;
1004 }
1005 }
1006
1007
1008 verificationRelease(dbCtx, verificationId) {
1009 const _scope = _fileScope('verificationRelease');
1010 this.logger.debug(_scope, 'called', { verificationId });
1011
1012 let result;
1013 try {
1014 result = this.statement.verificationDone.run({ verificationId });
1015 if (result.changes != 1) {
1016 throw new DBErrors.UnexpectedResult('did not release verification');
1017 }
1018 } catch (e) {
1019 this.logger.error(_scope, 'failed', { error: e, verificationId });
1020 throw e;
1021 }
1022 }
1023
1024
1025 verificationUpdate(dbCtx, verificationId, data) {
1026 const _scope = _fileScope('verificationUpdate');
1027 this.logger.debug(_scope, 'called', { verificationId, data });
1028
1029 const verificationData = {
1030 reason: null,
1031 verificationId,
1032 ...data,
1033 };
1034
1035 let result;
1036 try {
1037 this._verificationUpdateDataValidate(verificationData);
1038 DatabaseSQLite._verificationDataToEngine(verificationData);
1039 result = this.statement.verificationUpdate.run(verificationData);
1040 if (result.changes != 1) {
1041 throw new DBErrors.UnexpectedResult('did not update verification');
1042 }
1043 } catch (e) {
1044 this.logger.error(_scope, 'failed', { error: e, verificationData });
1045 throw e;
1046 }
1047 }
1048
1049
1050 verificationValidated(dbCtx, verificationId) {
1051 const _scope = _fileScope('verificationValidated');
1052 this.logger.debug(_scope, 'called', { verificationId });
1053
1054 let result;
1055 try {
1056 result = this.statement.verificationValidate.run({ verificationId });
1057 if (result.changes != 1) {
1058 throw new DBErrors.UnexpectedResult('did not set verification validation');
1059 }
1060 } catch (e) {
1061 this.logger.error(_scope, 'failed', { error: e, verificationId });
1062 throw e;
1063 }
1064 }
1065
1066
1067 }
1068
1069 module.exports = DatabaseSQLite;