update dependencies and devDependencies, fix lint issues
[websub-hub] / src / db / sqlite / index.js
1 'use strict';
2
3 const common = require('../../common');
4 const Database = require('../base');
5 const DBErrors = require('../errors');
6 const svh = require('../schema-version-helper');
7 const SQLite = require('better-sqlite3');
8 const fs = require('fs');
9 const path = require('path');
10 const { performance } = require('perf_hooks');
11
12 const _fileScope = common.fileScope(__filename);
13
14 const schemaVersionsSupported = {
15 min: {
16 major: 1,
17 minor: 0,
18 patch: 0,
19 },
20 max: {
21 major: 1,
22 minor: 0,
23 patch: 4,
24 },
25 };
26
27 // max of signed int64 (2^63 - 1), should be enough
28 const EPOCH_FOREVER = BigInt('9223372036854775807');
29 const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
30 const dateToEpoch = (date) => Math.round(date.getTime() / 1000);
31
32 class DatabaseSQLite extends Database {
33 constructor(logger, options) {
34 super(logger, options);
35
36 const connectionString = options.db.connectionString || 'sqlite://:memory:';
37 const csDelim = '://';
38 const dbFilename = connectionString.slice(connectionString.indexOf(csDelim) + csDelim.length);
39
40 const queryLogLevel = options.db.queryLogLevel;
41
42 const sqliteOptions = {
43 ...(queryLogLevel && {
44 // eslint-disable-next-line security/detect-object-injection
45 verbose: (query) => this.logger[queryLogLevel](_fileScope('SQLite:verbose'), '', { query }),
46 }),
47 };
48 this.db = new SQLite(dbFilename, sqliteOptions);
49 this.schemaVersionsSupported = schemaVersionsSupported;
50 this.changesSinceLastOptimize = BigInt(0);
51 this.optimizeAfterChanges = options.db.connectionString.optimizeAfterChanges;
52 this.db.pragma('foreign_keys = on'); // Enforce consistency.
53 this.db.pragma('journal_mode = WAL'); // Be faster, expect local filesystem.
54 this.db.defaultSafeIntegers(true); // This probably isn't necessary, but by using these BigInts we keep weird floats out of the query logs.
55
56 this._initTables();
57 this._initStatements();
58 }
59
60
61 /**
62 * SQLite cannot prepare its statements without a schema, ensure such exists.
63 */
64 _initTables() {
65 const _scope = _fileScope('_initTables');
66
67 // Migrations rely upon this table, ensure it exists.
68 const metaVersionTable = '_meta_schema_version';
69 const tableExists = this.db.prepare('SELECT name FROM sqlite_master WHERE type=:type AND name=:name').pluck(true).bind({ type: 'table', name: metaVersionTable });
70 let metaExists = tableExists.get();
71 if (metaExists === undefined) {
72 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
73
74 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
75 this.db.exec(fSql);
76 metaExists = tableExists.get();
77 /* istanbul ignore if */
78 if (metaExists === undefined) {
79 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
80 }
81 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
82 }
83
84 // Apply migrations
85 const currentSchema = this._currentSchema();
86 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
87 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
88 migrationsWanted.forEach((v) => {
89 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
90 try {
91 // eslint-disable-next-line security/detect-non-literal-fs-filename
92 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
93 this.logger.debug(_scope, 'applying migration', { version: v });
94 const results = this.db.exec(fSql);
95 this.logger.debug(_scope, 'migration results', { results });
96 this.logger.info(_scope, 'applied migration', { version: v });
97 } catch (e) {
98 this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
99 throw e;
100 }
101 });
102 }
103
104
105 _initStatements() {
106 const _scope = _fileScope('_initStatements');
107 const sqlDir = path.join(__dirname, 'sql');
108 this.statement = {};
109
110 // Decorate the statement calls we use with timing and logging.
111 const wrapFetch = (logName, statementName, fn) => {
112 const _wrapScope = _fileScope(logName);
113 return (...args) => {
114 const startTimestampMs = performance.now();
115 const rows = fn(...args);
116 DatabaseSQLite._deOphidiate(rows);
117 const elapsedTimeMs = performance.now() - startTimestampMs;
118 this.logger.debug(_wrapScope, 'complete', { statementName, elapsedTimeMs });
119 return rows;
120 };
121 };
122 const wrapRun = (logName, statementName, fn) => {
123 const _wrapScope = _fileScope(logName);
124 return (...args) => {
125 const startTimestampMs = performance.now();
126 const result = fn(...args);
127 const elapsedTimeMs = performance.now() - startTimestampMs;
128 this.logger.debug(_wrapScope, 'complete', { ...result, statementName, elapsedTimeMs });
129 result.duration = elapsedTimeMs;
130 return result;
131 };
132 };
133
134
135 for (const f of fs.readdirSync(sqlDir)) {
136 const fPath = path.join(sqlDir, f);
137 const { name: fName, ext: fExt } = path.parse(f);
138 // eslint-disable-next-line security/detect-non-literal-fs-filename
139 const stat = fs.statSync(fPath);
140 if (!stat.isFile()
141 || fExt.toLowerCase() !== '.sql') {
142 continue;
143 }
144 // eslint-disable-next-line security/detect-non-literal-fs-filename
145 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
146 const statementName = Database._camelfy(fName.toLowerCase(), '-');
147 let statement;
148 try {
149 statement = this.db.prepare(fSql);
150 } catch (e) {
151 /* istanbul ignore next */
152 this.logger.error(_scope, 'failed to prepare statement', { error: e, file: f });
153 /* istanbul ignore next */
154 throw e;
155 }
156 // eslint-disable-next-line security/detect-object-injection
157 this.statement[statementName] = statement;
158 const { get: origGet, all: origAll, run: origRun } = statement;
159 statement.get = wrapFetch('SQLite:get', statementName, origGet.bind(statement));
160 statement.all = wrapFetch('SQLite:all', statementName, origAll.bind(statement));
161 statement.run = wrapRun('SQLite:run', statementName, origRun.bind(statement));
162 }
163 this.statement._optimize = this.db.prepare('SELECT * FROM pragma_optimize(0x03)');
164
165 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
166 }
167
168
169 static _deOphidiate(rows) {
170 const rowsIsArray = Array.isArray(rows);
171 if (!rowsIsArray) {
172 rows = [rows];
173 }
174 const exemplaryRow = rows[0];
175 for (const prop in exemplaryRow) {
176 const camel = Database._camelfy(prop);
177 if (!(camel in exemplaryRow)) {
178 for (const d of rows) {
179 // eslint-disable-next-line security/detect-object-injection
180 d[camel] = d[prop];
181 // eslint-disable-next-line security/detect-object-injection
182 delete d[prop];
183 }
184 }
185 }
186 return rowsIsArray ? rows : rows[0];
187 }
188
189
190 _currentSchema() {
191 return this.db.prepare('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1').get();
192 }
193
194
195 healthCheck() {
196 const _scope = _fileScope('healthCheck');
197 this.logger.debug(_scope, 'called', {});
198 if (!this.db.open) {
199 throw new DBErrors.UnexpectedResult('database is not open');
200 }
201 return { open: this.db.open };
202 }
203
204
205 _engineInfo(result) {
206 if (result.changes) {
207 this.changesSinceLastOptimize += BigInt(result.changes);
208 this._optimize();
209 }
210 return {
211 changes: Number(result.changes),
212 lastInsertRowid: result.lastInsertRowid,
213 };
214 }
215
216
217 _closeConnection() {
218 this.db.close();
219 }
220
221
222 _optimize() {
223 const _scope = _fileScope('_optimize');
224
225 if (this.optimizeAfterChanges
226 && this.changesSinceLastOptimize >= this.optimizeAfterChanges) {
227 const optimize = this.statement._optimize.all();
228 this.logger.debug(_scope, 'optimize', { optimize });
229 this.db.pragma('optimize');
230 this.changesSinceLastOptimize = BigInt(0);
231 }
232 }
233
234
235 _purgeTables(really) {
236 if (really) {
237 [
238 'topic',
239 'topic_fetch_in_progress',
240 'verification',
241 'verification_in_progress',
242 'subscription',
243 'subscription_delivery_in_progress',
244 ].forEach((table) => {
245 const result = this.db.prepare(`DELETE FROM ${table}`).run();
246 this.logger.debug(_fileScope('_purgeTables'), 'success', { table, result });
247 });
248 }
249 }
250
251
252 context(fn) {
253 return fn(this.db);
254 }
255
256
257 transaction(dbCtx, fn) {
258 dbCtx = dbCtx || this.db;
259 return dbCtx.transaction(fn)();
260 }
261
262
263 authenticationSuccess(dbCtx, identifier) {
264 const _scope = _fileScope('authenticationSuccess');
265 this.logger.debug(_scope, 'called', { identifier });
266
267 let result;
268 try {
269 result = this.statement.authenticationSuccess.run({ identifier });
270 if (result.changes != 1) {
271 throw new DBErrors.UnexpectedResult('did not update authentication success');
272 }
273 } catch (e) {
274 this.logger.error(_scope, 'failed', { error: e, identifier });
275 throw e;
276 }
277 }
278
279
280 authenticationGet(dbCtx, identifier) {
281 const _scope = _fileScope('authenticationGet');
282 this.logger.debug(_scope, 'called', { identifier });
283
284 try {
285 return this.statement.authenticationGet.get({ identifier });
286 } catch (e) {
287 this.logger.error(_scope, 'failed', { error: e, identifier });
288 throw e;
289 }
290 }
291
292
293 authenticationUpsert(dbCtx, identifier, credential) {
294 const _scope = _fileScope('authenticationUpsert');
295 const scrubbedCredential = '*'.repeat((credential || '').length);
296 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
297
298 let result;
299 try {
300 result = this.statement.authenticationUpsert.run({ identifier, credential });
301 if (result.changes != 1) {
302 throw new DBErrors.UnexpectedResult('did not upsert authentication');
303 }
304 } catch (e) {
305 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential });
306 throw e;
307 }
308 }
309
310
311 /**
312 * Converts engine subscription fields to native types.
313 * @param {object} data subscription data
314 * @returns {object} data
315 */
316 static _subscriptionDataToNative(data) {
317 if (data) {
318 ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
319 // eslint-disable-next-line security/detect-object-injection
320 data[field] = epochToDate(data[field]);
321 });
322 }
323 return data;
324 }
325
326
327 subscriptionsByTopicId(dbCtx, topicId) {
328 const _scope = _fileScope('subscriptionsByTopicId');
329 this.logger.debug(_scope, 'called', { topicId });
330
331 try {
332 const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
333 return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
334 } catch (e) {
335 this.logger.error(_scope, 'failed', { error: e, topicId });
336 throw e;
337 }
338 }
339
340
341 subscriptionCountByTopicUrl(dbCtx, topicUrl) {
342 const _scope = _fileScope('subscriptionCountByTopicUrl');
343 this.logger.debug(_scope, 'called', { topicUrl });
344
345 try {
346 return this.statement.subscriptionCountByTopicUrl.get({ topicUrl });
347 } catch (e) {
348 this.logger.error(_scope, 'failed', { error: e, topicUrl });
349 throw e;
350 }
351 }
352
353
354 subscriptionDelete(dbCtx, callback, topicId) {
355 const _scope = _fileScope('subscriptionDelete');
356 this.logger.debug(_scope, 'called', { callback, topicId });
357
358 try {
359 const result = this.statement.subscriptionDelete.run({ callback, topicId });
360 if (result.changes != 1) {
361 throw new DBErrors.UnexpectedResult('did not delete subscription');
362 }
363 return this._engineInfo(result);
364 } catch (e) {
365 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
366 throw e;
367 }
368 }
369
370
371 subscriptionDeleteExpired(dbCtx, topicId) {
372 const _scope = _fileScope('subscriptionDeleteExpired');
373 this.logger.debug(_scope, 'called', { topicId });
374
375 try {
376 const result = this.statement.subscriptionDeleteExpired.run({ topicId });
377 this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
378 return this._engineInfo(result);
379 } catch (e) {
380 this.logger.error(_scope, 'failed', { error: e, topicId });
381 throw e;
382 }
383 }
384
385
386 subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
387 const _scope = _fileScope('subscriptionDeliveryClaim');
388 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
389
390 let subscriptionIds;
391 try {
392 this.db.transaction(() => {
393 subscriptionIds = this.statement.subscriptionDeliveryNeeded.all({ wanted }).map((claim) => claim.id);
394 subscriptionIds.forEach((subscriptionId) => {
395 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
396 if (result.changes != 1) {
397 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
398 }
399 });
400 })();
401 return subscriptionIds;
402 } catch (e) {
403 this.logger.error(_scope, 'failed', { error: e, wanted, claimTimeoutSeconds, claimant, subscriptionIds });
404 throw e;
405 }
406 }
407
408
409 subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
410 const _scope = _fileScope('subscriptionDeliveryClaimById');
411 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
412
413 try {
414 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
415 if (result.changes != 1) {
416 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
417 }
418 return this._engineInfo(result);
419 } catch (e) {
420 this.logger.error(_scope, 'failed', { error: e, subscriptionId, claimTimeoutSeconds, claimant });
421 throw e;
422 }
423 }
424
425
426 subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
427 const _scope = _fileScope('subscriptionDeliveryComplete');
428 this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
429
430 let result;
431 try {
432 this.db.transaction(() => {
433 topicContentUpdated = dateToEpoch(topicContentUpdated);
434 result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId, topicContentUpdated });
435 if (result.changes != 1) {
436 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
437 }
438 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
439 if (result.changes != 1) {
440 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
441 }
442 })();
443 return this._engineInfo(result);
444 } catch (e) {
445 this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
446 throw e;
447 }
448 }
449
450
451 subscriptionDeliveryGone(dbCtx, callback, topicId) {
452 const _scope = _fileScope('subscriptionDeliveryGone');
453 this.logger.debug(_scope, 'called', { callback, topicId });
454
455 let result;
456 try {
457 this.db.transaction(() => {
458 result = this.statement.subscriptionDelete.run({ callback, topicId });
459 if (result.changes != 1) {
460 throw new DBErrors.UnexpectedResult('did not delete subscription');
461 }
462 // Delete cascades to delivery
463 // result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
464 // if (result.changes != 1) {
465 // throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
466 // }
467 })();
468 return this._engineInfo(result);
469 } catch (e) {
470 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
471 throw e;
472 }
473 }
474
475
476 subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
477 const _scope = _fileScope('subscriptionDeliveryIncomplete');
478 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
479
480 let result;
481 try {
482 this.db.transaction(() => {
483 const { currentAttempt } = this.statement.subscriptionDeliveryAttempts.get({ callback, topicId });
484 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
485 result = this.statement.subscriptionDeliveryFailure.run({ nextAttemptDelaySeconds, callback, topicId });
486 if (result.changes != 1) {
487 throw new DBErrors.UnexpectedResult('did not set delivery failure');
488 }
489 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
490 if (result.changes != 1) {
491 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
492 }
493 })();
494 } catch (e) {
495 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
496 throw e;
497 }
498 }
499
500
501 subscriptionGet(dbCtx, callback, topicId) {
502 const _scope = _fileScope('subscriptionGet');
503 this.logger.debug(_scope, 'called', { callback, topicId });
504
505 let subscription;
506 try {
507 subscription = this.statement.subscriptionGet.get({ callback, topicId });
508 return DatabaseSQLite._subscriptionDataToNative(subscription);
509 } catch (e) {
510 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
511 throw e;
512 }
513 }
514
515
516 subscriptionGetById(dbCtx, subscriptionId) {
517 const _scope = _fileScope('subscriptionGetById');
518 this.logger.debug(_scope, 'called', { subscriptionId });
519
520 let subscription;
521 try {
522 subscription = this.statement.subscriptionGetById.get({ subscriptionId });
523 return DatabaseSQLite._subscriptionDataToNative(subscription);
524 } catch (e) {
525 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
526 throw e;
527 }
528 }
529
530
531 subscriptionUpdate(dbCtx, data) {
532 const _scope = _fileScope('subscriptionUpdate');
533 this.logger.debug(_scope, 'called', { data });
534
535 const subscriptionData = {
536 ...data,
537 };
538
539 this._subscriptionUpdateDataValidate(subscriptionData);
540
541 try {
542 const result = this.statement.subscriptionUpdate.run(subscriptionData);
543 if (result.changes != 1) {
544 throw new DBErrors.UnexpectedResult('did not update subscription');
545 }
546 } catch (e) {
547 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
548 throw e;
549 }
550 }
551
552
553 subscriptionUpsert(dbCtx, data) {
554 const _scope = _fileScope('subscriptionUpsert');
555 this.logger.debug(_scope, 'called', { ...data });
556
557 const subscriptionData = {
558 secret: null,
559 httpRemoteAddr: null,
560 httpFrom: null,
561 ...data,
562 };
563 this._subscriptionUpsertDataValidate(subscriptionData);
564
565 let result;
566 try {
567 result = this.statement.subscriptionUpsert.run(subscriptionData);
568 if (result.changes != 1) {
569 throw new DBErrors.UnexpectedResult('did not upsert subscription');
570 }
571 return this._engineInfo(result);
572 } catch (e) {
573 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
574 throw e;
575 }
576 }
577
578
579 topicDeleted(dbCtx, topicId) {
580 const _scope = _fileScope('topicDeleted');
581 this.logger.debug(_scope, 'called', { topicId });
582
583 let result;
584 try {
585 result = this.statement.topicDeleted.run({ topicId });
586 if (result.changes != 1) {
587 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
588 }
589 } catch (e) {
590 this.logger.error(_scope, 'failed', { error: e, topicId });
591 throw e;
592 }
593 }
594
595
596 topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
597 const _scope = _fileScope('topicFetchClaim');
598 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
599
600 let topicIds;
601 try {
602 this.db.transaction(() => {
603 topicIds = this.statement.topicContentFetchNeeded.all({ wanted }).map((claim) => claim.id);
604 topicIds.forEach((topicId) => {
605 const result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
606 if (result.changes != 1) {
607 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
608 }
609 });
610 })();
611 return topicIds;
612 } catch (e) {
613 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, wanted, claimTimeoutSeconds, claimant, topicIds });
614 throw e;
615 }
616 }
617
618
619 topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
620 const _scope = _fileScope('topicFetchClaimById');
621 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
622
623 let result;
624 try {
625 result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
626 if (result.changes != 1) {
627 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
628 }
629 return this._engineInfo(result);
630 } catch (e) {
631 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, topicId, claimTimeoutSeconds, claimant });
632 throw e;
633 }
634 }
635
636
637 topicFetchComplete(dbCtx, topicId) {
638 const _scope = _fileScope('topicFetchComplete');
639 this.logger.debug(_scope, 'called', { topicId });
640
641 let result;
642 try {
643 this.db.transaction(() => {
644 result = this.statement.topicAttemptsReset.run({ topicId, forever: EPOCH_FOREVER });
645 if (result.changes != 1) {
646 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
647 }
648 result = this.statement.topicContentFetchDone.run({ topicId });
649 if (result.changes != 1) {
650 throw new DBErrors.UnexpectedResult('did not release topic fetch');
651 }
652 })();
653 return this._engineInfo(result);
654 } catch (e) {
655 this.logger.error(_scope, 'failed', { error: e, result, topicId });
656 throw e;
657 }
658 }
659
660
661 topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
662 const _scope = _fileScope('topicFetchIncomplete');
663 this.logger.debug(_scope, 'called', { topicId });
664
665 let result;
666 try {
667 this.db.transaction(() => {
668 const { contentFetchAttemptsSinceSuccess: currentAttempt } = this.statement.topicAttempts.get({ topicId });
669 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
670 result = this.statement.topicAttemptsIncrement.run({ topicId, nextAttemptDelaySeconds });
671 if (result.changes != 1) {
672 throw new DBErrors.UnexpectedResult('did not set topic attempts');
673 }
674 result = this.statement.topicContentFetchDone.run({ topicId });
675 if (result.changes != 1) {
676 throw new DBErrors.UnexpectedResult('did not release topic fetch');
677 }
678 return result;
679 })();
680 return this._engineInfo(result);
681 } catch (e) {
682 this.logger.error(_scope, 'failed', { error: e, result, topicId });
683 throw e;
684 }
685 }
686
687
688 topicFetchRequested(dbCtx, topicId) {
689 const _scope = _fileScope('topicFetchRequested');
690 this.logger.debug(_scope, 'called', { topicId });
691
692 let result;
693 try {
694 result = this.statement.topicContentFetchRequested.run({ topicId });
695 if (result.changes != 1) {
696 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
697 }
698 return this._engineInfo(result);
699 } catch (e) {
700 this.logger.error(_scope, 'failed', { error: e, topicId });
701 throw e;
702 }
703 }
704
705
706 /**
707 * Converts engine topic fields to native types.
708 * @param {object} data topic
709 * @returns {object} topic data
710 */
711 static _topicDataToNative(data) {
712 if (data) {
713 data.isActive = !!data.isActive;
714 data.isDeleted = !!data.isDeleted;
715 ['created', 'lastPublish', 'contentFetchNextAttempt', 'contentUpdated'].forEach((field) => {
716 // eslint-disable-next-line security/detect-object-injection
717 data[field] = epochToDate(data[field]);
718 });
719 }
720 return data;
721 }
722
723
724 // eslint-disable-next-line no-unused-vars
725 topicGetAll(dbCtx) {
726 const _scope = _fileScope('topicGetAll');
727 this.logger.debug(_scope, 'called');
728
729 let topics;
730 try {
731 topics = this.statement.topicGetInfoAll.all();
732 } catch (e) {
733 this.logger.error(_scope, 'failed', { error: e, topics });
734 throw e;
735 }
736 if (topics) {
737 topics = topics
738 .map(DatabaseSQLite._topicDataToNative)
739 .map(this._topicDefaults.bind(this));
740 }
741 return topics;
742 }
743
744
745 topicGetById(dbCtx, topicId, applyDefaults = true) {
746 const _scope = _fileScope('topicGetById');
747 this.logger.debug(_scope, 'called', { topicId });
748
749 let topic;
750 try {
751 topic = this.statement.topicGetById.get({ topicId });
752 DatabaseSQLite._topicDataToNative(topic);
753 if (applyDefaults) {
754 topic = this._topicDefaults(topic);
755 }
756 return topic;
757 } catch (e) {
758 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
759 throw e;
760 }
761 }
762
763
764 topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
765 const _scope = _fileScope('topicGetByUrl');
766 this.logger.debug(_scope, 'called', { topicUrl });
767
768 let topic;
769 try {
770 topic = this.statement.topicGetByUrl.get({ topicUrl });
771 DatabaseSQLite._topicDataToNative(topic);
772 if (applyDefaults) {
773 topic = this._topicDefaults(topic);
774 }
775 return topic;
776 } catch (e) {
777 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
778 throw e;
779 }
780 }
781
782
783 topicGetContentById(dbCtx, topicId) {
784 const _scope = _fileScope('topicGetContentById');
785 this.logger.debug(_scope, 'called', { topicId });
786
787 let topic;
788 try {
789 topic = this.statement.topicGetContentById.get({ topicId });
790 DatabaseSQLite._topicDataToNative(topic);
791 return this._topicDefaults(topic);
792 } catch (e) {
793 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
794 throw e;
795 }
796 }
797
798
799 topicPendingDelete(dbCtx, topicId) {
800 const _scope = _fileScope('topicPendingDelete');
801 this.logger.debug(_scope, 'called', { topicId });
802
803 try {
804 this.db.transaction(() => {
805 const topic = this.statement.topicGetById.get({ topicId });
806 if (!topic.isDeleted) {
807 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
808 return;
809 }
810
811 const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
812 if (subscriberCount) {
813 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
814 return;
815 }
816
817 const result = this.statement.topicDeleteById.run({ topicId });
818 if (result.changes !== 1) {
819 throw new DBErrors.UnexpectedResult('did not delete topic');
820 }
821 })();
822 this.logger.debug(_scope, 'success', { topicId });
823 } catch (e) {
824 this.logger.error(_scope, 'failed', { error: e, topicId });
825 throw e;
826 }
827 }
828
829
830 topicPublishHistory(dbCtx, topicId, days) {
831 const _scope = _fileScope('topicPublishHistory');
832 this.logger.debug(_scope, 'called', { topicId, days });
833
834 const events = this.statement.topicPublishHistory.all({ topicId, daysAgo: days });
835 const history = Array.from({ length: days }, () => 0);
836 // eslint-disable-next-line security/detect-object-injection
837 events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
838
839 return history;
840 }
841
842
843 topicSet(dbCtx, data) {
844 const _scope = _fileScope('topicSet');
845 this.logger.debug(_scope, 'called', data);
846
847 const topicSetData = {
848 publisherValidationUrl: null,
849 leaseSecondsPreferred: null,
850 leaseSecondsMin: null,
851 leaseSecondsMax: null,
852 ...data,
853 };
854
855 let result;
856 try {
857 this._topicSetDataValidate(topicSetData);
858 result = this.statement.topicUpsert.run(topicSetData);
859 if (result.changes != 1) {
860 throw new DBErrors.UnexpectedResult('did not set topic data');
861 }
862 return this._engineInfo(result);
863 } catch (e) {
864 this.logger.error(_scope, 'failed', { error: e, result });
865 throw e;
866 }
867 }
868
869
870 topicSetContent(dbCtx, data) {
871 const _scope = _fileScope('topicSetContent');
872 const topicSetContentData = {
873 contentType: null,
874 httpETag: null,
875 httpLastModified: null,
876 ...data,
877 };
878 const logData = {
879 ...topicSetContentData,
880 content: common.logTruncate(topicSetContentData.content, 100),
881 };
882 this.logger.debug(_scope, 'called', logData);
883
884 let result;
885 try {
886 this._topicSetContentDataValidate(topicSetContentData);
887 result = this.statement.topicSetContent.run(topicSetContentData);
888 logData.result = result;
889 if (result.changes != 1) {
890 throw new DBErrors.UnexpectedResult('did not set topic content');
891 }
892 result = this.statement.topicSetContentHistory.run({
893 topicId: data.topicId,
894 contentHash: data.contentHash,
895 contentSize: data.content.length,
896 });
897 if (result.changes != 1) {
898 throw new DBErrors.UnexpectedResult('did not set topic content history');
899 }
900 return this._engineInfo(result);
901 } catch (e) {
902 this.logger.error(_scope, 'failed', { error: e, ...logData });
903 throw e;
904 }
905 }
906
907
908 topicUpdate(dbCtx, data) {
909 const _scope = _fileScope('topicUpdate');
910 this.logger.debug(_scope, 'called', { data });
911
912 const topicData = {
913 leaseSecondsPreferred: null,
914 leaseSecondsMin: null,
915 leaseSecondsMax: null,
916 publisherValidationUrl: null,
917 ...data,
918 };
919
920 this._topicUpdateDataValidate(topicData);
921
922 try {
923 const result = this.statement.topicUpdate.run(topicData);
924 if (result.changes != 1) {
925 throw new DBErrors.UnexpectedResult('did not update topic');
926 }
927 } catch (e) {
928 this.logger.error(_scope, 'failed', { error: e, topicData });
929 throw e;
930 }
931 }
932
933
934 verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
935 const _scope = _fileScope('verificationClaim');
936 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
937
938 let verificationIds;
939 try {
940 this.db.transaction(() => {
941 verificationIds = this.statement.verificationNeeded.all({ wanted }).map((claim) => claim.id);
942 verificationIds.forEach((verificationId) => {
943 const result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
944 if (result.changes != 1) {
945 throw new DBErrors.UnexpectedResult('did not claim verification');
946 }
947 });
948 })();
949 return verificationIds;
950 } catch (e) {
951 this.logger.error(_scope, 'failed to claim verifications', { wanted, claimTimeoutSeconds });
952 throw e;
953 }
954 }
955
956
957 verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
958 const _scope = _fileScope('verificationClaimById');
959 this.logger.debug(_scope, 'called', { verificationId, claimTimeoutSeconds, claimant });
960
961 let result;
962 try {
963 result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
964 if (result.changes != 1) {
965 throw new DBErrors.UnexpectedResult('did not claim verification');
966 }
967 return this._engineInfo(result);
968 } catch (e) {
969 this.logger.error(_scope, 'failed to claim verification', { error: e, verificationId, claimTimeoutSeconds, claimant });
970 throw e;
971 }
972 }
973
974
975 verificationComplete(dbCtx, verificationId, callback, topicId) {
976 const _scope = _fileScope('verificationComplete');
977 this.logger.debug(_scope, 'called', { verificationId });
978
979 let result;
980 try {
981 this.db.transaction(() => {
982 result = this.statement.verificationScrub.run({ verificationId, callback, topicId });
983 if (result.changes < 1) {
984 throw new DBErrors.UnexpectedResult('did not remove verifications');
985 }
986 })();
987 } catch (e) {
988 this.logger.error(_scope, 'failed', { verificationId });
989 throw e;
990 }
991 return this._engineInfo(result);
992 }
993
994
995 /**
996 * Converts engine verification fields to native types.
997 * @param {object} data verification
998 */
999 static _verificationDataToNative(data) {
1000 if (data) {
1001 data.isPublisherValidated = !!data.isPublisherValidated;
1002 }
1003 }
1004
1005
1006 verificationGetById(dbCtx, verificationId) {
1007 const _scope = _fileScope('verificationGetById');
1008 this.logger.debug(_scope, 'called', { verificationId });
1009
1010 let verification;
1011 try {
1012 verification = this.statement.verificationGetById.get({ verificationId });
1013 DatabaseSQLite._verificationDataToNative(verification);
1014 return verification;
1015 } catch (e) {
1016 this.logger.error(_scope, 'failed', { error: e, verificationId });
1017 throw e;
1018 }
1019 }
1020
1021
1022 verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
1023 const _scope = _fileScope('verificationIncomplete');
1024 this.logger.debug(_scope, 'called', { verificationId });
1025
1026 let result;
1027 try {
1028 this.db.transaction(() => {
1029 const { attempts: currentAttempt } = this.statement.verificationAttempts.get({ verificationId });
1030 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
1031 result = this.statement.verificationAttemptsIncrement.run({ verificationId, nextAttemptDelaySeconds });
1032 if (result.changes != 1) {
1033 throw new DBErrors.UnexpectedResult('did not increment verification attempts');
1034 }
1035 result = this.statement.verificationDone.run({ verificationId });
1036 if (result.changes != 1) {
1037 throw new DBErrors.UnexpectedResult('did not release verification in progress');
1038 }
1039 return result;
1040 })();
1041 return this._engineInfo(result);
1042 } catch (e) {
1043 this.logger.error(_scope, 'failed', { error: e, result, verificationId });
1044 throw e;
1045 }
1046 }
1047
1048
1049 /**
1050 * Convert native verification fields to engine types.
1051 * @param {object} data verification
1052 */
1053 static _verificationDataToEngine(data) {
1054 if (data) {
1055 data.isPublisherValidated = data.isPublisherValidated ? 1 : 0;
1056 }
1057 }
1058
1059
1060 verificationInsert(dbCtx, verification) {
1061 const _scope = _fileScope('verificationInsert');
1062 this.logger.debug(_scope, 'called', { verification });
1063
1064 const verificationData = {
1065 secret: null,
1066 httpRemoteAddr: null,
1067 httpFrom: null,
1068 requestId: null,
1069 ...verification,
1070 };
1071
1072 let result, verificationId;
1073 try {
1074 this._verificationDataValidate(verificationData);
1075 DatabaseSQLite._verificationDataToEngine(verificationData);
1076 result = this.statement.verificationInsert.run(verificationData);
1077 if (result.changes != 1) {
1078 throw new DBErrors.UnexpectedResult('did not insert verification');
1079 }
1080 verificationId = result.lastInsertRowid;
1081 this.logger.debug(_scope, 'inserted verification', { verificationId });
1082
1083 return verificationId;
1084 } catch (e) {
1085 this.logger.error(_scope, 'failed', { error: e, verificationData });
1086 throw e;
1087 }
1088 }
1089
1090
1091 verificationRelease(dbCtx, verificationId) {
1092 const _scope = _fileScope('verificationRelease');
1093 this.logger.debug(_scope, 'called', { verificationId });
1094
1095 let result;
1096 try {
1097 result = this.statement.verificationDone.run({ verificationId });
1098 if (result.changes != 1) {
1099 throw new DBErrors.UnexpectedResult('did not release verification');
1100 }
1101 } catch (e) {
1102 this.logger.error(_scope, 'failed', { error: e, verificationId });
1103 throw e;
1104 }
1105 }
1106
1107
1108 verificationUpdate(dbCtx, verificationId, data) {
1109 const _scope = _fileScope('verificationUpdate');
1110 this.logger.debug(_scope, 'called', { verificationId, data });
1111
1112 const verificationData = {
1113 reason: null,
1114 verificationId,
1115 ...data,
1116 };
1117
1118 let result;
1119 try {
1120 this._verificationUpdateDataValidate(verificationData);
1121 DatabaseSQLite._verificationDataToEngine(verificationData);
1122 result = this.statement.verificationUpdate.run(verificationData);
1123 if (result.changes != 1) {
1124 throw new DBErrors.UnexpectedResult('did not update verification');
1125 }
1126 } catch (e) {
1127 this.logger.error(_scope, 'failed', { error: e, verificationData });
1128 throw e;
1129 }
1130 }
1131
1132
1133 verificationValidated(dbCtx, verificationId) {
1134 const _scope = _fileScope('verificationValidated');
1135 this.logger.debug(_scope, 'called', { verificationId });
1136
1137 let result;
1138 try {
1139 result = this.statement.verificationValidate.run({ verificationId });
1140 if (result.changes != 1) {
1141 throw new DBErrors.UnexpectedResult('did not set verification validation');
1142 }
1143 } catch (e) {
1144 this.logger.error(_scope, 'failed', { error: e, verificationId });
1145 throw e;
1146 }
1147 }
1148
1149
1150 }
1151
1152 module.exports = DatabaseSQLite;