update dependencies, fixes to support new authentication features
[websub-hub] / src / db / sqlite / index.js
1 'use strict';
2
3 const common = require('../../common');
4 const Database = require('../base');
5 const DBErrors = require('../errors');
6 const svh = require('../schema-version-helper');
7 const SQLite = require('better-sqlite3');
8 const fs = require('fs');
9 const path = require('path');
10 const { performance } = require('perf_hooks');
11
12 const _fileScope = common.fileScope(__filename);
13
14 const schemaVersionsSupported = {
15 min: {
16 major: 1,
17 minor: 0,
18 patch: 0,
19 },
20 max: {
21 major: 1,
22 minor: 1,
23 patch: 0,
24 },
25 };
26
27 // max of signed int64 (2^63 - 1), should be enough
28 const EPOCH_FOREVER = BigInt('9223372036854775807');
29 const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
30 const dateToEpoch = (date) => Math.round(date.getTime() / 1000);
31
32 class DatabaseSQLite extends Database {
33 constructor(logger, options) {
34 super(logger, options);
35
36 const connectionString = options.db.connectionString || 'sqlite://:memory:';
37 const csDelim = '://';
38 const dbFilename = connectionString.slice(connectionString.indexOf(csDelim) + csDelim.length);
39
40 const queryLogLevel = options.db.queryLogLevel;
41
42 const sqliteOptions = {
43 ...(queryLogLevel && {
44 // eslint-disable-next-line security/detect-object-injection
45 verbose: (query) => this.logger[queryLogLevel](_fileScope('SQLite:verbose'), '', { query }),
46 }),
47 };
48 this.db = new SQLite(dbFilename, sqliteOptions);
49 this.schemaVersionsSupported = schemaVersionsSupported;
50 this.changesSinceLastOptimize = BigInt(0);
51 this.optimizeAfterChanges = options.db.optimizeAfterChanges;
52 this.db.pragma('foreign_keys = on'); // Enforce consistency.
53 this.db.pragma('journal_mode = WAL'); // Be faster, expect local filesystem.
54 this.db.defaultSafeIntegers(true); // This probably isn't necessary, but by using these BigInts we keep weird floats out of the query logs.
55
56 this._initTables();
57 this._initStatements();
58 }
59
60
61 /**
62 * SQLite cannot prepare its statements without a schema, ensure such exists.
63 */
64 _initTables() {
65 const _scope = _fileScope('_initTables');
66
67 // Migrations rely upon this table, ensure it exists.
68 const metaVersionTable = '_meta_schema_version';
69 const tableExists = this.db.prepare('SELECT name FROM sqlite_master WHERE type=:type AND name=:name').pluck(true).bind({ type: 'table', name: metaVersionTable });
70 let metaExists = tableExists.get();
71 if (metaExists === undefined) {
72 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
73
74 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
75 this.db.exec(fSql);
76 metaExists = tableExists.get();
77 /* istanbul ignore if */
78 if (metaExists === undefined) {
79 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
80 }
81 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
82 }
83
84 // Apply migrations
85 const currentSchema = this._currentSchema();
86 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
87 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
88 migrationsWanted.forEach((v) => {
89 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
90 try {
91 // eslint-disable-next-line security/detect-non-literal-fs-filename
92 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
93 this.logger.debug(_scope, 'applying migration', { version: v });
94 const results = this.db.exec(fSql);
95 this.logger.debug(_scope, 'migration results', { results });
96 this.logger.info(_scope, 'applied migration', { version: v });
97 } catch (e) {
98 this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
99 throw e;
100 }
101 });
102 }
103
104
105 _initStatements() {
106 const _scope = _fileScope('_initStatements');
107 const sqlDir = path.join(__dirname, 'sql');
108 this.statement = {};
109
110 // Decorate the statement calls we use with timing and logging.
111 const wrapFetch = (logName, statementName, fn) => {
112 const _wrapScope = _fileScope(logName);
113 return (...args) => {
114 const startTimestampMs = performance.now();
115 const rows = fn(...args);
116 DatabaseSQLite._deOphidiate(rows);
117 const elapsedTimeMs = performance.now() - startTimestampMs;
118 this.logger.debug(_wrapScope, 'complete', { statementName, elapsedTimeMs });
119 return rows;
120 };
121 };
122 const wrapRun = (logName, statementName, fn) => {
123 const _wrapScope = _fileScope(logName);
124 return (...args) => {
125 const startTimestampMs = performance.now();
126 const result = fn(...args);
127 const elapsedTimeMs = performance.now() - startTimestampMs;
128 this.logger.debug(_wrapScope, 'complete', { ...result, statementName, elapsedTimeMs });
129 result.duration = elapsedTimeMs;
130 return result;
131 };
132 };
133
134
135 for (const f of fs.readdirSync(sqlDir)) {
136 const fPath = path.join(sqlDir, f);
137 const { name: fName, ext: fExt } = path.parse(f);
138 // eslint-disable-next-line security/detect-non-literal-fs-filename
139 const stat = fs.statSync(fPath);
140 if (!stat.isFile()
141 || fExt.toLowerCase() !== '.sql') {
142 continue;
143 }
144 // eslint-disable-next-line security/detect-non-literal-fs-filename
145 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
146 const statementName = Database._camelfy(fName.toLowerCase(), '-');
147 let statement;
148 try {
149 statement = this.db.prepare(fSql);
150 } catch (e) {
151 /* istanbul ignore next */
152 this.logger.error(_scope, 'failed to prepare statement', { error: e, file: f });
153 /* istanbul ignore next */
154 throw e;
155 }
156 // eslint-disable-next-line security/detect-object-injection
157 this.statement[statementName] = statement;
158 const { get: origGet, all: origAll, run: origRun } = statement;
159 statement.get = wrapFetch('SQLite:get', statementName, origGet.bind(statement));
160 statement.all = wrapFetch('SQLite:all', statementName, origAll.bind(statement));
161 statement.run = wrapRun('SQLite:run', statementName, origRun.bind(statement));
162 }
163 this.statement._optimize = this.db.prepare('SELECT * FROM pragma_optimize(0x03)');
164
165 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
166 }
167
168
169 static _deOphidiate(rows) {
170 const rowsIsArray = Array.isArray(rows);
171 if (!rowsIsArray) {
172 rows = [rows];
173 }
174 const exemplaryRow = rows[0];
175 for (const prop in exemplaryRow) {
176 const camel = Database._camelfy(prop);
177 if (!(camel in exemplaryRow)) {
178 for (const d of rows) {
179 // eslint-disable-next-line security/detect-object-injection
180 d[camel] = d[prop];
181 // eslint-disable-next-line security/detect-object-injection
182 delete d[prop];
183 }
184 }
185 }
186 return rowsIsArray ? rows : rows[0];
187 }
188
189
190 _currentSchema() {
191 return this.db.prepare('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1').get();
192 }
193
194
195 healthCheck() {
196 const _scope = _fileScope('healthCheck');
197 this.logger.debug(_scope, 'called', {});
198 if (!this.db.open) {
199 throw new DBErrors.UnexpectedResult('database is not open');
200 }
201 return { open: this.db.open };
202 }
203
204
205 _engineInfo(result) {
206 if (result.changes) {
207 this.changesSinceLastOptimize += BigInt(result.changes);
208 this._optimize();
209 }
210 return {
211 changes: Number(result.changes),
212 lastInsertRowid: result.lastInsertRowid,
213 };
214 }
215
216
217 _closeConnection() {
218 this.db.close();
219 }
220
221
222 _optimize() {
223 const _scope = _fileScope('_optimize');
224
225 if (this.optimizeAfterChanges
226 && this.changesSinceLastOptimize >= this.optimizeAfterChanges) {
227 const optimize = this.statement._optimize.all();
228 this.logger.debug(_scope, 'optimize', { optimize });
229 this.db.pragma('optimize');
230 this.changesSinceLastOptimize = BigInt(0);
231 }
232 }
233
234
235 _purgeTables(really) {
236 if (really) {
237 [
238 'topic',
239 'topic_fetch_in_progress',
240 'verification',
241 'verification_in_progress',
242 'subscription',
243 'subscription_delivery_in_progress',
244 ].forEach((table) => {
245 const result = this.db.prepare(`DELETE FROM ${table}`).run();
246 this.logger.debug(_fileScope('_purgeTables'), 'success', { table, result });
247 });
248 }
249 }
250
251
252 context(fn) {
253 return fn(this.db);
254 }
255
256
257 transaction(dbCtx, fn) {
258 dbCtx = dbCtx || this.db;
259 return dbCtx.transaction(fn)();
260 }
261
262
263 authenticationSuccess(dbCtx, identifier) {
264 const _scope = _fileScope('authenticationSuccess');
265 this.logger.debug(_scope, 'called', { identifier });
266
267 let result;
268 try {
269 result = this.statement.authenticationSuccess.run({ identifier });
270 if (result.changes != 1) {
271 throw new DBErrors.UnexpectedResult('did not update authentication success');
272 }
273 } catch (e) {
274 this.logger.error(_scope, 'failed', { error: e, identifier });
275 throw e;
276 }
277 }
278
279
280 authenticationGet(dbCtx, identifier) {
281 const _scope = _fileScope('authenticationGet');
282 this.logger.debug(_scope, 'called', { identifier });
283
284 try {
285 return this.statement.authenticationGet.get({ identifier });
286 } catch (e) {
287 this.logger.error(_scope, 'failed', { error: e, identifier });
288 throw e;
289 }
290 }
291
292
293 authenticationUpsert(dbCtx, identifier, credential, otpKey) {
294 const _scope = _fileScope('authenticationUpsert');
295 const scrubbedCredential = '*'.repeat((credential || '').length);
296 const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null;
297 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential, scrubbedOTPKey });
298
299 let result;
300 try {
301 result = this.statement.authenticationUpsert.run({ identifier, credential, otpKey });
302 if (result.changes != 1) {
303 throw new DBErrors.UnexpectedResult('did not upsert authentication');
304 }
305 } catch (e) {
306 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential, scrubbedOTPKey });
307 throw e;
308 }
309 }
310
311
312 authenticationUpdateOTPKey(dbCtx, identifier, otpKey) {
313 const _scope = _fileScope('authenticationUpdateOTPKey');
314 const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null;
315 this.logger.debug(_scope, 'called', { identifier, scrubbedOTPKey });
316
317 let result;
318 try {
319 result = this.statement.authenticationUpdateOtpKey.run({ identifier, otpKey });
320 if (result.changes != 1) {
321 throw new DBErrors.UnexpectedResult('did not update authentication otp key');
322 }
323 } catch (e) {
324 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedOTPKey });
325 throw e;
326 }
327 }
328
329
330 authenticationUpdateCredential(dbCtx, identifier, credential) {
331 const _scope = _fileScope('authenticationUpdateCredential');
332 const scrubbedCredential = '*'.repeat((credential || '').length);
333 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
334
335 let result;
336 try {
337 result = this.statement.authenticationUpdateCredential.run({ identifier, credential });
338 if (result.changes != 1) {
339 throw new DBErrors.UnexpectedResult('did not update authentication credential');
340 }
341 } catch (e) {
342 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential });
343 throw e;
344 }
345 }
346
347
348 /**
349 * Converts engine subscription fields to native types.
350 * @param {object} data subscription data
351 * @returns {object} data
352 */
353 static _subscriptionDataToNative(data) {
354 if (data) {
355 ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
356 // eslint-disable-next-line security/detect-object-injection
357 data[field] = epochToDate(data[field]);
358 });
359 }
360 return data;
361 }
362
363
364 subscriptionsByTopicId(dbCtx, topicId) {
365 const _scope = _fileScope('subscriptionsByTopicId');
366 this.logger.debug(_scope, 'called', { topicId });
367
368 try {
369 const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
370 return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
371 } catch (e) {
372 this.logger.error(_scope, 'failed', { error: e, topicId });
373 throw e;
374 }
375 }
376
377
378 subscriptionCountByTopicUrl(dbCtx, topicUrl) {
379 const _scope = _fileScope('subscriptionCountByTopicUrl');
380 this.logger.debug(_scope, 'called', { topicUrl });
381
382 try {
383 return this.statement.subscriptionCountByTopicUrl.get({ topicUrl });
384 } catch (e) {
385 this.logger.error(_scope, 'failed', { error: e, topicUrl });
386 throw e;
387 }
388 }
389
390
391 subscriptionDelete(dbCtx, callback, topicId) {
392 const _scope = _fileScope('subscriptionDelete');
393 this.logger.debug(_scope, 'called', { callback, topicId });
394
395 try {
396 const result = this.statement.subscriptionDelete.run({ callback, topicId });
397 if (result.changes != 1) {
398 throw new DBErrors.UnexpectedResult('did not delete subscription');
399 }
400 return this._engineInfo(result);
401 } catch (e) {
402 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
403 throw e;
404 }
405 }
406
407
408 subscriptionDeleteExpired(dbCtx, topicId) {
409 const _scope = _fileScope('subscriptionDeleteExpired');
410 this.logger.debug(_scope, 'called', { topicId });
411
412 try {
413 const result = this.statement.subscriptionDeleteExpired.run({ topicId });
414 this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
415 return this._engineInfo(result);
416 } catch (e) {
417 this.logger.error(_scope, 'failed', { error: e, topicId });
418 throw e;
419 }
420 }
421
422
423 subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
424 const _scope = _fileScope('subscriptionDeliveryClaim');
425 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
426
427 let subscriptionIds;
428 try {
429 this.db.transaction(() => {
430 subscriptionIds = this.statement.subscriptionDeliveryNeeded.all({ wanted }).map((claim) => claim.id);
431 subscriptionIds.forEach((subscriptionId) => {
432 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
433 if (result.changes != 1) {
434 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
435 }
436 });
437 })();
438 return subscriptionIds;
439 } catch (e) {
440 this.logger.error(_scope, 'failed', { error: e, wanted, claimTimeoutSeconds, claimant, subscriptionIds });
441 throw e;
442 }
443 }
444
445
446 subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
447 const _scope = _fileScope('subscriptionDeliveryClaimById');
448 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
449
450 try {
451 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
452 if (result.changes != 1) {
453 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
454 }
455 return this._engineInfo(result);
456 } catch (e) {
457 this.logger.error(_scope, 'failed', { error: e, subscriptionId, claimTimeoutSeconds, claimant });
458 throw e;
459 }
460 }
461
462
463 subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
464 const _scope = _fileScope('subscriptionDeliveryComplete');
465 this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
466
467 let result;
468 try {
469 this.db.transaction(() => {
470 topicContentUpdated = dateToEpoch(topicContentUpdated);
471 result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId, topicContentUpdated });
472 if (result.changes != 1) {
473 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
474 }
475 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
476 if (result.changes != 1) {
477 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
478 }
479 })();
480 return this._engineInfo(result);
481 } catch (e) {
482 this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
483 throw e;
484 }
485 }
486
487
488 subscriptionDeliveryGone(dbCtx, callback, topicId) {
489 const _scope = _fileScope('subscriptionDeliveryGone');
490 this.logger.debug(_scope, 'called', { callback, topicId });
491
492 let result;
493 try {
494 this.db.transaction(() => {
495 result = this.statement.subscriptionDelete.run({ callback, topicId });
496 if (result.changes != 1) {
497 throw new DBErrors.UnexpectedResult('did not delete subscription');
498 }
499 // Delete cascades to delivery
500 // result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
501 // if (result.changes != 1) {
502 // throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
503 // }
504 })();
505 return this._engineInfo(result);
506 } catch (e) {
507 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
508 throw e;
509 }
510 }
511
512
513 subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
514 const _scope = _fileScope('subscriptionDeliveryIncomplete');
515 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
516
517 let result;
518 try {
519 this.db.transaction(() => {
520 const { currentAttempt } = this.statement.subscriptionDeliveryAttempts.get({ callback, topicId });
521 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
522 result = this.statement.subscriptionDeliveryFailure.run({ nextAttemptDelaySeconds, callback, topicId });
523 if (result.changes != 1) {
524 throw new DBErrors.UnexpectedResult('did not set delivery failure');
525 }
526 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
527 if (result.changes != 1) {
528 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
529 }
530 })();
531 } catch (e) {
532 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
533 throw e;
534 }
535 }
536
537
538 subscriptionGet(dbCtx, callback, topicId) {
539 const _scope = _fileScope('subscriptionGet');
540 this.logger.debug(_scope, 'called', { callback, topicId });
541
542 let subscription;
543 try {
544 subscription = this.statement.subscriptionGet.get({ callback, topicId });
545 return DatabaseSQLite._subscriptionDataToNative(subscription);
546 } catch (e) {
547 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
548 throw e;
549 }
550 }
551
552
553 subscriptionGetById(dbCtx, subscriptionId) {
554 const _scope = _fileScope('subscriptionGetById');
555 this.logger.debug(_scope, 'called', { subscriptionId });
556
557 let subscription;
558 try {
559 subscription = this.statement.subscriptionGetById.get({ subscriptionId });
560 return DatabaseSQLite._subscriptionDataToNative(subscription);
561 } catch (e) {
562 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
563 throw e;
564 }
565 }
566
567
568 subscriptionUpdate(dbCtx, data) {
569 const _scope = _fileScope('subscriptionUpdate');
570 this.logger.debug(_scope, 'called', { data });
571
572 const subscriptionData = {
573 ...data,
574 };
575
576 this._subscriptionUpdateDataValidate(subscriptionData);
577
578 try {
579 const result = this.statement.subscriptionUpdate.run(subscriptionData);
580 if (result.changes != 1) {
581 throw new DBErrors.UnexpectedResult('did not update subscription');
582 }
583 } catch (e) {
584 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
585 throw e;
586 }
587 }
588
589
590 subscriptionUpsert(dbCtx, data) {
591 const _scope = _fileScope('subscriptionUpsert');
592 this.logger.debug(_scope, 'called', { ...data });
593
594 const subscriptionData = {
595 secret: null,
596 httpRemoteAddr: null,
597 httpFrom: null,
598 ...data,
599 };
600 this._subscriptionUpsertDataValidate(subscriptionData);
601
602 let result;
603 try {
604 result = this.statement.subscriptionUpsert.run(subscriptionData);
605 if (result.changes != 1) {
606 throw new DBErrors.UnexpectedResult('did not upsert subscription');
607 }
608 return this._engineInfo(result);
609 } catch (e) {
610 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
611 throw e;
612 }
613 }
614
615
616 topicDeleted(dbCtx, topicId) {
617 const _scope = _fileScope('topicDeleted');
618 this.logger.debug(_scope, 'called', { topicId });
619
620 let result;
621 try {
622 result = this.statement.topicDeleted.run({ topicId });
623 if (result.changes != 1) {
624 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
625 }
626 } catch (e) {
627 this.logger.error(_scope, 'failed', { error: e, topicId });
628 throw e;
629 }
630 }
631
632
633 topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
634 const _scope = _fileScope('topicFetchClaim');
635 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
636
637 let topicIds;
638 try {
639 this.db.transaction(() => {
640 topicIds = this.statement.topicContentFetchNeeded.all({ wanted }).map((claim) => claim.id);
641 topicIds.forEach((topicId) => {
642 const result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
643 if (result.changes != 1) {
644 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
645 }
646 });
647 })();
648 return topicIds;
649 } catch (e) {
650 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, wanted, claimTimeoutSeconds, claimant, topicIds });
651 throw e;
652 }
653 }
654
655
656 topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
657 const _scope = _fileScope('topicFetchClaimById');
658 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
659
660 let result;
661 try {
662 result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
663 if (result.changes != 1) {
664 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
665 }
666 return this._engineInfo(result);
667 } catch (e) {
668 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, topicId, claimTimeoutSeconds, claimant });
669 throw e;
670 }
671 }
672
673
674 topicFetchComplete(dbCtx, topicId) {
675 const _scope = _fileScope('topicFetchComplete');
676 this.logger.debug(_scope, 'called', { topicId });
677
678 let result;
679 try {
680 this.db.transaction(() => {
681 result = this.statement.topicAttemptsReset.run({ topicId, forever: EPOCH_FOREVER });
682 if (result.changes != 1) {
683 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
684 }
685 result = this.statement.topicContentFetchDone.run({ topicId });
686 if (result.changes != 1) {
687 throw new DBErrors.UnexpectedResult('did not release topic fetch');
688 }
689 })();
690 return this._engineInfo(result);
691 } catch (e) {
692 this.logger.error(_scope, 'failed', { error: e, result, topicId });
693 throw e;
694 }
695 }
696
697
698 topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
699 const _scope = _fileScope('topicFetchIncomplete');
700 this.logger.debug(_scope, 'called', { topicId });
701
702 let result;
703 try {
704 this.db.transaction(() => {
705 const { contentFetchAttemptsSinceSuccess: currentAttempt } = this.statement.topicAttempts.get({ topicId });
706 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
707 result = this.statement.topicAttemptsIncrement.run({ topicId, nextAttemptDelaySeconds });
708 if (result.changes != 1) {
709 throw new DBErrors.UnexpectedResult('did not set topic attempts');
710 }
711 result = this.statement.topicContentFetchDone.run({ topicId });
712 if (result.changes != 1) {
713 throw new DBErrors.UnexpectedResult('did not release topic fetch');
714 }
715 return result;
716 })();
717 return this._engineInfo(result);
718 } catch (e) {
719 this.logger.error(_scope, 'failed', { error: e, result, topicId });
720 throw e;
721 }
722 }
723
724
725 topicFetchRequested(dbCtx, topicId) {
726 const _scope = _fileScope('topicFetchRequested');
727 this.logger.debug(_scope, 'called', { topicId });
728
729 let result;
730 try {
731 result = this.statement.topicContentFetchRequested.run({ topicId });
732 if (result.changes != 1) {
733 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
734 }
735 return this._engineInfo(result);
736 } catch (e) {
737 this.logger.error(_scope, 'failed', { error: e, topicId });
738 throw e;
739 }
740 }
741
742
743 /**
744 * Converts engine topic fields to native types.
745 * @param {object} data topic
746 * @returns {object} topic data
747 */
748 static _topicDataToNative(data) {
749 if (data) {
750 data.isActive = !!data.isActive;
751 data.isDeleted = !!data.isDeleted;
752 ['created', 'lastPublish', 'contentFetchNextAttempt', 'contentUpdated'].forEach((field) => {
753 // eslint-disable-next-line security/detect-object-injection
754 data[field] = epochToDate(data[field]);
755 });
756 }
757 return data;
758 }
759
760
761 // eslint-disable-next-line no-unused-vars
762 topicGetAll(dbCtx) {
763 const _scope = _fileScope('topicGetAll');
764 this.logger.debug(_scope, 'called');
765
766 let topics;
767 try {
768 topics = this.statement.topicGetInfoAll.all();
769 } catch (e) {
770 this.logger.error(_scope, 'failed', { error: e, topics });
771 throw e;
772 }
773 if (topics) {
774 topics = topics
775 .map(DatabaseSQLite._topicDataToNative)
776 .map(this._topicDefaults.bind(this));
777 }
778 return topics;
779 }
780
781
782 topicGetById(dbCtx, topicId, applyDefaults = true) {
783 const _scope = _fileScope('topicGetById');
784 this.logger.debug(_scope, 'called', { topicId });
785
786 let topic;
787 try {
788 topic = this.statement.topicGetById.get({ topicId });
789 DatabaseSQLite._topicDataToNative(topic);
790 if (applyDefaults) {
791 topic = this._topicDefaults(topic);
792 }
793 return topic;
794 } catch (e) {
795 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
796 throw e;
797 }
798 }
799
800
801 topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
802 const _scope = _fileScope('topicGetByUrl');
803 this.logger.debug(_scope, 'called', { topicUrl });
804
805 let topic;
806 try {
807 topic = this.statement.topicGetByUrl.get({ topicUrl });
808 DatabaseSQLite._topicDataToNative(topic);
809 if (applyDefaults) {
810 topic = this._topicDefaults(topic);
811 }
812 return topic;
813 } catch (e) {
814 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
815 throw e;
816 }
817 }
818
819
820 topicGetContentById(dbCtx, topicId) {
821 const _scope = _fileScope('topicGetContentById');
822 this.logger.debug(_scope, 'called', { topicId });
823
824 let topic;
825 try {
826 topic = this.statement.topicGetContentById.get({ topicId });
827 DatabaseSQLite._topicDataToNative(topic);
828 return this._topicDefaults(topic);
829 } catch (e) {
830 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
831 throw e;
832 }
833 }
834
835
836 topicPendingDelete(dbCtx, topicId) {
837 const _scope = _fileScope('topicPendingDelete');
838 this.logger.debug(_scope, 'called', { topicId });
839
840 try {
841 this.db.transaction(() => {
842 const topic = this.statement.topicGetById.get({ topicId });
843 if (!topic.isDeleted) {
844 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
845 return;
846 }
847
848 const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
849 if (subscriberCount) {
850 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
851 return;
852 }
853
854 const result = this.statement.topicDeleteById.run({ topicId });
855 if (result.changes !== 1) {
856 throw new DBErrors.UnexpectedResult('did not delete topic');
857 }
858 })();
859 this.logger.debug(_scope, 'success', { topicId });
860 } catch (e) {
861 this.logger.error(_scope, 'failed', { error: e, topicId });
862 throw e;
863 }
864 }
865
866
867 topicPublishHistory(dbCtx, topicId, days) {
868 const _scope = _fileScope('topicPublishHistory');
869 this.logger.debug(_scope, 'called', { topicId, days });
870
871 const events = this.statement.topicPublishHistory.all({ topicId, daysAgo: days });
872 const history = Array.from({ length: days }, () => 0);
873 // eslint-disable-next-line security/detect-object-injection
874 events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
875
876 return history;
877 }
878
879
880 topicSet(dbCtx, data) {
881 const _scope = _fileScope('topicSet');
882 this.logger.debug(_scope, 'called', data);
883
884 const topicSetData = {
885 publisherValidationUrl: null,
886 leaseSecondsPreferred: null,
887 leaseSecondsMin: null,
888 leaseSecondsMax: null,
889 ...data,
890 };
891
892 let result;
893 try {
894 this._topicSetDataValidate(topicSetData);
895 result = this.statement.topicUpsert.run(topicSetData);
896 if (result.changes != 1) {
897 throw new DBErrors.UnexpectedResult('did not set topic data');
898 }
899 return this._engineInfo(result);
900 } catch (e) {
901 this.logger.error(_scope, 'failed', { error: e, result });
902 throw e;
903 }
904 }
905
906
907 topicSetContent(dbCtx, data) {
908 const _scope = _fileScope('topicSetContent');
909 const topicSetContentData = {
910 contentType: null,
911 httpETag: null,
912 httpLastModified: null,
913 ...data,
914 };
915 const logData = {
916 ...topicSetContentData,
917 content: common.logTruncate(topicSetContentData.content, 100),
918 };
919 this.logger.debug(_scope, 'called', logData);
920
921 let result;
922 try {
923 this._topicSetContentDataValidate(topicSetContentData);
924 result = this.statement.topicSetContent.run(topicSetContentData);
925 logData.result = result;
926 if (result.changes != 1) {
927 throw new DBErrors.UnexpectedResult('did not set topic content');
928 }
929 result = this.statement.topicSetContentHistory.run({
930 topicId: data.topicId,
931 contentHash: data.contentHash,
932 contentSize: data.content.length,
933 });
934 if (result.changes != 1) {
935 throw new DBErrors.UnexpectedResult('did not set topic content history');
936 }
937 return this._engineInfo(result);
938 } catch (e) {
939 this.logger.error(_scope, 'failed', { error: e, ...logData });
940 throw e;
941 }
942 }
943
944
945 topicUpdate(dbCtx, data) {
946 const _scope = _fileScope('topicUpdate');
947 this.logger.debug(_scope, 'called', { data });
948
949 const topicData = {
950 leaseSecondsPreferred: null,
951 leaseSecondsMin: null,
952 leaseSecondsMax: null,
953 publisherValidationUrl: null,
954 ...data,
955 };
956
957 this._topicUpdateDataValidate(topicData);
958
959 try {
960 const result = this.statement.topicUpdate.run(topicData);
961 if (result.changes != 1) {
962 throw new DBErrors.UnexpectedResult('did not update topic');
963 }
964 } catch (e) {
965 this.logger.error(_scope, 'failed', { error: e, topicData });
966 throw e;
967 }
968 }
969
970
971 verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
972 const _scope = _fileScope('verificationClaim');
973 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
974
975 let verificationIds;
976 try {
977 this.db.transaction(() => {
978 verificationIds = this.statement.verificationNeeded.all({ wanted }).map((claim) => claim.id);
979 verificationIds.forEach((verificationId) => {
980 const result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
981 if (result.changes != 1) {
982 throw new DBErrors.UnexpectedResult('did not claim verification');
983 }
984 });
985 })();
986 return verificationIds;
987 } catch (e) {
988 this.logger.error(_scope, 'failed to claim verifications', { wanted, claimTimeoutSeconds });
989 throw e;
990 }
991 }
992
993
994 verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
995 const _scope = _fileScope('verificationClaimById');
996 this.logger.debug(_scope, 'called', { verificationId, claimTimeoutSeconds, claimant });
997
998 let result;
999 try {
1000 result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
1001 if (result.changes != 1) {
1002 throw new DBErrors.UnexpectedResult('did not claim verification');
1003 }
1004 return this._engineInfo(result);
1005 } catch (e) {
1006 this.logger.error(_scope, 'failed to claim verification', { error: e, verificationId, claimTimeoutSeconds, claimant });
1007 throw e;
1008 }
1009 }
1010
1011
1012 verificationComplete(dbCtx, verificationId, callback, topicId) {
1013 const _scope = _fileScope('verificationComplete');
1014 this.logger.debug(_scope, 'called', { verificationId });
1015
1016 let result;
1017 try {
1018 this.db.transaction(() => {
1019 result = this.statement.verificationScrub.run({ verificationId, callback, topicId });
1020 if (result.changes < 1) {
1021 throw new DBErrors.UnexpectedResult('did not remove verifications');
1022 }
1023 })();
1024 } catch (e) {
1025 this.logger.error(_scope, 'failed', { verificationId });
1026 throw e;
1027 }
1028 return this._engineInfo(result);
1029 }
1030
1031
1032 /**
1033 * Converts engine verification fields to native types.
1034 * @param {object} data verification
1035 */
1036 static _verificationDataToNative(data) {
1037 if (data) {
1038 data.isPublisherValidated = !!data.isPublisherValidated;
1039 }
1040 }
1041
1042
1043 verificationGetById(dbCtx, verificationId) {
1044 const _scope = _fileScope('verificationGetById');
1045 this.logger.debug(_scope, 'called', { verificationId });
1046
1047 let verification;
1048 try {
1049 verification = this.statement.verificationGetById.get({ verificationId });
1050 DatabaseSQLite._verificationDataToNative(verification);
1051 return verification;
1052 } catch (e) {
1053 this.logger.error(_scope, 'failed', { error: e, verificationId });
1054 throw e;
1055 }
1056 }
1057
1058
1059 verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
1060 const _scope = _fileScope('verificationIncomplete');
1061 this.logger.debug(_scope, 'called', { verificationId });
1062
1063 let result;
1064 try {
1065 this.db.transaction(() => {
1066 const { attempts: currentAttempt } = this.statement.verificationAttempts.get({ verificationId });
1067 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
1068 result = this.statement.verificationAttemptsIncrement.run({ verificationId, nextAttemptDelaySeconds });
1069 if (result.changes != 1) {
1070 throw new DBErrors.UnexpectedResult('did not increment verification attempts');
1071 }
1072 result = this.statement.verificationDone.run({ verificationId });
1073 if (result.changes != 1) {
1074 throw new DBErrors.UnexpectedResult('did not release verification in progress');
1075 }
1076 return result;
1077 })();
1078 return this._engineInfo(result);
1079 } catch (e) {
1080 this.logger.error(_scope, 'failed', { error: e, result, verificationId });
1081 throw e;
1082 }
1083 }
1084
1085
1086 /**
1087 * Convert native verification fields to engine types.
1088 * @param {object} data verification
1089 */
1090 static _verificationDataToEngine(data) {
1091 if (data) {
1092 data.isPublisherValidated = data.isPublisherValidated ? 1 : 0;
1093 }
1094 }
1095
1096
1097 verificationInsert(dbCtx, verification) {
1098 const _scope = _fileScope('verificationInsert');
1099 this.logger.debug(_scope, 'called', { verification });
1100
1101 const verificationData = {
1102 secret: null,
1103 httpRemoteAddr: null,
1104 httpFrom: null,
1105 requestId: null,
1106 ...verification,
1107 };
1108
1109 let result, verificationId;
1110 try {
1111 this._verificationDataValidate(verificationData);
1112 DatabaseSQLite._verificationDataToEngine(verificationData);
1113 result = this.statement.verificationInsert.run(verificationData);
1114 if (result.changes != 1) {
1115 throw new DBErrors.UnexpectedResult('did not insert verification');
1116 }
1117 verificationId = result.lastInsertRowid;
1118 this.logger.debug(_scope, 'inserted verification', { verificationId });
1119
1120 return verificationId;
1121 } catch (e) {
1122 this.logger.error(_scope, 'failed', { error: e, verificationData });
1123 throw e;
1124 }
1125 }
1126
1127
1128 verificationRelease(dbCtx, verificationId) {
1129 const _scope = _fileScope('verificationRelease');
1130 this.logger.debug(_scope, 'called', { verificationId });
1131
1132 let result;
1133 try {
1134 result = this.statement.verificationDone.run({ verificationId });
1135 if (result.changes != 1) {
1136 throw new DBErrors.UnexpectedResult('did not release verification');
1137 }
1138 } catch (e) {
1139 this.logger.error(_scope, 'failed', { error: e, verificationId });
1140 throw e;
1141 }
1142 }
1143
1144
1145 verificationUpdate(dbCtx, verificationId, data) {
1146 const _scope = _fileScope('verificationUpdate');
1147 this.logger.debug(_scope, 'called', { verificationId, data });
1148
1149 const verificationData = {
1150 reason: null,
1151 verificationId,
1152 ...data,
1153 };
1154
1155 let result;
1156 try {
1157 this._verificationUpdateDataValidate(verificationData);
1158 DatabaseSQLite._verificationDataToEngine(verificationData);
1159 result = this.statement.verificationUpdate.run(verificationData);
1160 if (result.changes != 1) {
1161 throw new DBErrors.UnexpectedResult('did not update verification');
1162 }
1163 } catch (e) {
1164 this.logger.error(_scope, 'failed', { error: e, verificationData });
1165 throw e;
1166 }
1167 }
1168
1169
1170 verificationValidated(dbCtx, verificationId) {
1171 const _scope = _fileScope('verificationValidated');
1172 this.logger.debug(_scope, 'called', { verificationId });
1173
1174 let result;
1175 try {
1176 result = this.statement.verificationValidate.run({ verificationId });
1177 if (result.changes != 1) {
1178 throw new DBErrors.UnexpectedResult('did not set verification validation');
1179 }
1180 } catch (e) {
1181 this.logger.error(_scope, 'failed', { error: e, verificationId });
1182 throw e;
1183 }
1184 }
1185
1186
1187 }
1188
1189 module.exports = DatabaseSQLite;