07d6633311113dcdaf2eae05dabcc912f94fdece
[websub-hub] / src / db / sqlite / index.js
1 'use strict';
2
3 const common = require('../../common');
4 const Database = require('../base');
5 const DBErrors = require('../errors');
6 const svh = require('../schema-version-helper');
7 const SQLite = require('better-sqlite3');
8 const fs = require('fs');
9 const path = require('path');
10 const { performance } = require('perf_hooks');
11
12 const _fileScope = common.fileScope(__filename);
13
14 const schemaVersionsSupported = {
15 min: {
16 major: 1,
17 minor: 0,
18 patch: 0,
19 },
20 max: {
21 major: 1,
22 minor: 0,
23 patch: 1,
24 },
25 };
26
27 // max of signed int64 (2^63 - 1), should be enough
28 const EPOCH_FOREVER = BigInt('9223372036854775807');
29
30 class DatabaseSQLite extends Database {
31 constructor(logger, options) {
32 super(logger, options);
33
34 const connectionString = options.db.connectionString || 'sqlite://:memory:';
35 const csDelim = '://';
36 const dbFilename = connectionString.slice(connectionString.indexOf(csDelim) + csDelim.length);
37
38 const queryLogLevel = options.db.queryLogLevel;
39
40 const sqliteOptions = {
41 ...(queryLogLevel && {
42 // eslint-disable-next-line security/detect-object-injection
43 verbose: (query) => this.logger[queryLogLevel](_fileScope('SQLite:verbose'), '', { query }),
44 }),
45 };
46 this.db = new SQLite(dbFilename, sqliteOptions);
47 this.schemaVersionsSupported = schemaVersionsSupported;
48 this.changesSinceLastOptimize = BigInt(0);
49 this.optimizeAfterChanges = options.db.connectionString.optimizeAfterChanges;
50 this.db.pragma('foreign_keys = on'); // Enforce consistency.
51 this.db.pragma('journal_mode = WAL'); // Be faster, expect local filesystem.
52 this.db.defaultSafeIntegers(true); // This probably isn't necessary, but by using these BigInts we keep weird floats out of the query logs.
53
54 this._initTables();
55 this._initStatements();
56 }
57
58
59 /**
60 * SQLite cannot prepare its statements without a schema, ensure such exists.
61 */
62 _initTables() {
63 const _scope = _fileScope('_initTables');
64
65 // Migrations rely upon this table, ensure it exists.
66 const metaVersionTable = '_meta_schema_version';
67 const tableExists = this.db.prepare('SELECT name FROM sqlite_master WHERE type=:type AND name=:name').pluck(true).bind({ type: 'table', name: metaVersionTable });
68 let metaExists = tableExists.get();
69 if (metaExists === undefined) {
70 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
71 // eslint-disable-next-line security/detect-non-literal-fs-filename
72 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
73 this.db.exec(fSql);
74 metaExists = tableExists.get();
75 /* istanbul ignore if */
76 if (metaExists === undefined) {
77 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
78 }
79 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
80 }
81
82 // Apply migrations
83 const currentSchema = this._currentSchema();
84 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
85 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
86 migrationsWanted.forEach((v) => {
87 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
88 // eslint-disable-next-line security/detect-non-literal-fs-filename
89 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
90 this.logger.info(_scope, 'applying migration', { version: v });
91 this.db.exec(fSql);
92 });
93 }
94
95
96 _initStatements() {
97 const _scope = _fileScope('_initStatements');
98 const sqlDir = path.join(__dirname, 'sql');
99 this.statement = {};
100
101 // Decorate the statement calls we use with timing and logging.
102 const wrapFetch = (logName, statementName, fn) => {
103 const _wrapScope = _fileScope(logName);
104 return (...args) => {
105 const startTimestampMs = performance.now();
106 const rows = fn(...args);
107 DatabaseSQLite._deOphidiate(rows);
108 const elapsedTimeMs = performance.now() - startTimestampMs;
109 this.logger.debug(_wrapScope, 'complete', { statementName, elapsedTimeMs });
110 return rows;
111 };
112 };
113 const wrapRun = (logName, statementName, fn) => {
114 const _wrapScope = _fileScope(logName);
115 return (...args) => {
116 const startTimestampMs = performance.now();
117 const result = fn(...args);
118 const elapsedTimeMs = performance.now() - startTimestampMs;
119 this.logger.debug(_wrapScope, 'complete', { ...result, statementName, elapsedTimeMs });
120 result.duration = elapsedTimeMs;
121 return result;
122 };
123 };
124
125 // eslint-disable-next-line security/detect-non-literal-fs-filename
126 for (const f of fs.readdirSync(sqlDir)) {
127 const fPath = path.join(sqlDir, f);
128 const { name: fName, ext: fExt } = path.parse(f);
129 // eslint-disable-next-line security/detect-non-literal-fs-filename
130 const stat = fs.statSync(fPath);
131 if (!stat.isFile()
132 || fExt.toLowerCase() !== '.sql') {
133 continue;
134 }
135 // eslint-disable-next-line security/detect-non-literal-fs-filename
136 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
137 const statementName = Database._camelfy(fName.toLowerCase(), '-');
138 let statement;
139 try {
140 statement = this.db.prepare(fSql);
141 } catch (e) {
142 /* istanbul ignore next */
143 this.logger.error(_scope, 'failed to prepare statement', { error: e, file: f });
144 /* istanbul ignore next */
145 throw e;
146 }
147 // eslint-disable-next-line security/detect-object-injection
148 this.statement[statementName] = statement;
149 const { get: origGet, all: origAll, run: origRun } = statement;
150 statement.get = wrapFetch('SQLite:get', statementName, origGet.bind(statement));
151 statement.all = wrapFetch('SQLite:all', statementName, origAll.bind(statement));
152 statement.run = wrapRun('SQLite:run', statementName, origRun.bind(statement));
153 }
154 this.statement._optimize = this.db.prepare('SELECT * FROM pragma_optimize(0x03)');
155
156 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
157 }
158
159
160 static _deOphidiate(rows) {
161 const rowsIsArray = Array.isArray(rows);
162 if (!rowsIsArray) {
163 rows = [rows];
164 }
165 const exemplaryRow = rows[0];
166 for (const prop in exemplaryRow) {
167 const camel = Database._camelfy(prop);
168 if (!(camel in exemplaryRow)) {
169 for (const d of rows) {
170 // eslint-disable-next-line security/detect-object-injection
171 d[camel] = d[prop];
172 // eslint-disable-next-line security/detect-object-injection
173 delete d[prop];
174 }
175 }
176 }
177 return rowsIsArray ? rows : rows[0];
178 }
179
180
181 _currentSchema() {
182 return this.db.prepare('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1').get();
183 }
184
185
186 healthCheck() {
187 const _scope = _fileScope('healthCheck');
188 this.logger.debug(_scope, 'called', {});
189 if (!this.db.open) {
190 throw new DBErrors.UnexpectedResult('database is not open');
191 }
192 return { open: this.db.open };
193 }
194
195
196 _engineInfo(result) {
197 if (result.changes) {
198 this.changesSinceLastOptimize += BigInt(result.changes);
199 this._optimize();
200 }
201 return {
202 changes: Number(result.changes),
203 lastInsertRowid: result.lastInsertRowid,
204 };
205 }
206
207
208 _closeConnection() {
209 this.db.close();
210 }
211
212
213 _optimize() {
214 const _scope = _fileScope('_optimize');
215
216 if (this.optimizeAfterChanges
217 && this.changesSinceLastOptimize >= this.optimizeAfterChanges) {
218 const optimize = this.statement._optimize.all();
219 this.logger.debug(_scope, 'optimize', { optimize });
220 this.db.pragma('optimize');
221 this.changesSinceLastOptimize = BigInt(0);
222 }
223 }
224
225
226 _purgeTables(really) {
227 if (really) {
228 [
229 'topic',
230 'topic_fetch_in_progress',
231 'verification',
232 'verification_in_progress',
233 'subscription',
234 'subscription_delivery_in_progress',
235 ].map((table) => {
236 const result = this.db.prepare(`DELETE FROM ${table}`).run();
237 this.logger.debug(_fileScope('_purgeTables'), 'success', { table, result });
238 });
239 }
240 }
241
242
243 context(fn) {
244 return fn(this.db);
245 }
246
247
248 transaction(dbCtx, fn) {
249 dbCtx = dbCtx || this.db;
250 return dbCtx.transaction(fn)();
251 }
252
253
254 authenticationSuccess(dbCtx, identifier) {
255 const _scope = _fileScope('authenticationSuccess');
256 this.logger.debug(_scope, 'called', { identifier });
257
258 let result;
259 try {
260 result = this.statement.authenticationSuccess.run({ identifier });
261 if (result.changes != 1) {
262 throw new DBErrors.UnexpectedResult('did not update authentication success');
263 }
264 } catch (e) {
265 this.logger.error(_scope, 'failed', { error: e, identifier });
266 throw e;
267 }
268 }
269
270
271 authenticationGet(dbCtx, identifier) {
272 const _scope = _fileScope('authenticationGet');
273 this.logger.debug(_scope, 'called', { identifier });
274
275 try {
276 return this.statement.authenticationGet.get({ identifier });
277 } catch (e) {
278 this.logger.error(_scope, 'failed', { error: e, identifier });
279 throw e;
280 }
281 }
282
283
284 authenticationUpsert(dbCtx, identifier, credential) {
285 const _scope = _fileScope('authenticationUpsert');
286 const scrubbedCredential = '*'.repeat((credential || '').length);
287 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
288
289 let result;
290 try {
291 result = this.statement.authenticationUpsert.run({ identifier, credential });
292 if (result.changes != 1) {
293 throw new DBErrors.UnexpectedResult('did not upsert authentication');
294 }
295 } catch (e) {
296 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
297 throw e;
298 }
299 }
300
301
302 /**
303 * Converts engine subscription fields to native types.
304 * @param {Object} data
305 */
306 static _subscriptionDataToNative(data) {
307 const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
308 if (data) {
309 ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
310 // eslint-disable-next-line security/detect-object-injection
311 data[field] = epochToDate(data[field]);
312 });
313 }
314 return data;
315 }
316
317
318 subscriptionsByTopicId(dbCtx, topicId) {
319 const _scope = _fileScope('subscriptionsByTopicId');
320 this.logger.debug(_scope, 'called', { topicId });
321
322 try {
323 const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
324 return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
325 } catch (e) {
326 this.logger.error(_scope, 'failed', { error: e, topicId });
327 throw e;
328 }
329 }
330
331
332 subscriptionCountByTopicUrl(dbCtx, topicUrl) {
333 const _scope = _fileScope('subscriptionCountByTopicUrl');
334 this.logger.debug(_scope, 'called', { topicUrl });
335
336 try {
337 return this.statement.subscriptionCountByTopicUrl.get({ topicUrl });
338 } catch (e) {
339 this.logger.error(_scope, 'failed', { error: e, topicUrl });
340 throw e;
341 }
342 }
343
344
345 subscriptionDelete(dbCtx, callback, topicId) {
346 const _scope = _fileScope('subscriptionDelete');
347 this.logger.debug(_scope, 'called', { callback, topicId });
348
349 try {
350 const result = this.statement.subscriptionDelete.run({ callback, topicId });
351 if (result.changes != 1) {
352 throw new DBErrors.UnexpectedResult('did not delete subscription');
353 }
354 return this._engineInfo(result);
355 } catch (e) {
356 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
357 throw e;
358 }
359 }
360
361
362 subscriptionDeleteExpired(dbCtx, topicId) {
363 const _scope = _fileScope('subscriptionDeleteExpired');
364 this.logger.debug(_scope, 'called', { topicId });
365
366 try {
367 const result = this.statement.subscriptionDeleteExpired.run({ topicId });
368 this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
369 return this._engineInfo(result);
370 } catch (e) {
371 this.logger.error(_scope, 'failed', { error: e, topicId });
372 throw e;
373 }
374 }
375
376
377 subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
378 const _scope = _fileScope('subscriptionDeliveryClaim');
379 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
380
381 let subscriptionIds;
382 try {
383 this.db.transaction(() => {
384 subscriptionIds = this.statement.subscriptionDeliveryNeeded.all({ wanted }).map((claim) => claim.id);
385 subscriptionIds.forEach((subscriptionId) => {
386 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
387 if (result.changes != 1) {
388 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
389 }
390 });
391 })();
392 return subscriptionIds;
393 } catch (e) {
394 this.logger.error(_scope, 'failed', { error: e, wanted, claimTimeoutSeconds, claimant, subscriptionIds });
395 throw e;
396 }
397 }
398
399
400 subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
401 const _scope = _fileScope('subscriptionDeliveryClaimById');
402 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
403
404 try {
405 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
406 if (result.changes != 1) {
407 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
408 }
409 return this._engineInfo(result);
410 } catch (e) {
411 this.logger.error(_scope, 'failed', { error: e, subscriptionId, claimTimeoutSeconds, claimant });
412 throw e;
413 }
414 }
415
416
417 subscriptionDeliveryComplete(dbCtx, callback, topicId) {
418 const _scope = _fileScope('subscriptionDeliveryComplete');
419 this.logger.debug(_scope, 'called', { callback, topicId });
420
421 let result;
422 try {
423 this.db.transaction(() => {
424 result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId });
425 if (result.changes != 1) {
426 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
427 }
428 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
429 if (result.changes != 1) {
430 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
431 }
432 })();
433 return this._engineInfo(result);
434 } catch (e) {
435 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
436 throw e;
437 }
438 }
439
440
441 subscriptionDeliveryGone(dbCtx, callback, topicId) {
442 const _scope = _fileScope('subscriptionDeliveryGone');
443 this.logger.debug(_scope, 'called', { callback, topicId });
444
445 let result;
446 try {
447 this.db.transaction(() => {
448 result = this.statement.subscriptionDelete.run({ callback, topicId });
449 if (result.changes != 1) {
450 throw new DBErrors.UnexpectedResult('did not delete subscription');
451 }
452 // Delete cascades to delivery
453 // result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
454 // if (result.changes != 1) {
455 // throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
456 // }
457 })();
458 return this._engineInfo(result);
459 } catch (e) {
460 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
461 throw e;
462 }
463 }
464
465
466 subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
467 const _scope = _fileScope('subscriptionDeliveryIncomplete');
468 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
469
470 let result;
471 try {
472 this.db.transaction(() => {
473 const { currentAttempt } = this.statement.subscriptionDeliveryAttempts.get({ callback, topicId });
474 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
475 result = this.statement.subscriptionDeliveryFailure.run({ nextAttemptDelaySeconds, callback, topicId });
476 if (result.changes != 1) {
477 throw new DBErrors.UnexpectedResult('did not set delivery failure');
478 }
479 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
480 if (result.changes != 1) {
481 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
482 }
483 })();
484 } catch (e) {
485 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
486 throw e;
487 }
488 }
489
490
491 subscriptionGet(dbCtx, callback, topicId) {
492 const _scope = _fileScope('subscriptionGet');
493 this.logger.debug(_scope, 'called', { callback, topicId });
494
495 let subscription;
496 try {
497 subscription = this.statement.subscriptionGet.get({ callback, topicId });
498 return DatabaseSQLite._subscriptionDataToNative(subscription);
499 } catch (e) {
500 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
501 throw e;
502 }
503 }
504
505
506 subscriptionGetById(dbCtx, subscriptionId) {
507 const _scope = _fileScope('subscriptionGetById');
508 this.logger.debug(_scope, 'called', { subscriptionId });
509
510 let subscription;
511 try {
512 subscription = this.statement.subscriptionGetById.get({ subscriptionId });
513 return DatabaseSQLite._subscriptionDataToNative(subscription);
514 } catch (e) {
515 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
516 throw e;
517 }
518 }
519
520
521 subscriptionUpdate(dbCtx, data) {
522 const _scope = _fileScope('subscriptionUpdate');
523 this.logger.debug(_scope, 'called', { data });
524
525 const subscriptionData = {
526 ...data,
527 };
528
529 this._subscriptionUpdateDataValidate(subscriptionData);
530
531 try {
532 const result = this.statement.subscriptionUpdate.run(subscriptionData);
533 if (result.changes != 1) {
534 throw new DBErrors.UnexpectedResult('did not update subscription');
535 }
536 } catch (e) {
537 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
538 throw e;
539 }
540 }
541
542
543 subscriptionUpsert(dbCtx, data) {
544 const _scope = _fileScope('subscriptionUpsert');
545 this.logger.debug(_scope, 'called', { ...data });
546
547 const subscriptionData = {
548 secret: null,
549 httpRemoteAddr: null,
550 httpFrom: null,
551 ...data,
552 }
553 this._subscriptionUpsertDataValidate(subscriptionData);
554
555 let result;
556 try {
557 result = this.statement.subscriptionUpsert.run(subscriptionData);
558 if (result.changes != 1) {
559 throw new DBErrors.UnexpectedResult('did not upsert subscription');
560 }
561 return this._engineInfo(result);
562 } catch (e) {
563 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
564 throw e;
565 }
566 }
567
568
569 topicDeleted(dbCtx, topicId) {
570 const _scope = _fileScope('topicDeleted');
571 this.logger.debug(_scope, 'called', { topicId });
572
573 let result;
574 try {
575 result = this.statement.topicDeleted.run({ topicId });
576 if (result.changes != 1) {
577 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
578 }
579 } catch (e) {
580 this.logger.error(_scope, 'failed', { error: e, topicId });
581 throw e;
582 }
583 }
584
585
586 topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
587 const _scope = _fileScope('topicFetchClaim');
588 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
589
590 let topicIds;
591 try {
592 this.db.transaction(() => {
593 topicIds = this.statement.topicContentFetchNeeded.all({ wanted }).map((claim) => claim.id);
594 topicIds.forEach((topicId) => {
595 const result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
596 if (result.changes != 1) {
597 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
598 }
599 });
600 })();
601 return topicIds;
602 } catch (e) {
603 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, wanted, claimTimeoutSeconds, claimant, topicIds });
604 throw e;
605 }
606 }
607
608
609 topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
610 const _scope = _fileScope('topicFetchClaimById');
611 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
612
613 let result;
614 try {
615 result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
616 if (result.changes != 1) {
617 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
618 }
619 return this._engineInfo(result);
620 } catch (e) {
621 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, topicId, claimTimeoutSeconds, claimant });
622 throw e;
623 }
624 }
625
626
627 topicFetchComplete(dbCtx, topicId) {
628 const _scope = _fileScope('topicFetchComplete');
629 this.logger.debug(_scope, 'called', { topicId });
630
631 let result;
632 try {
633 this.db.transaction(() => {
634 result = this.statement.topicAttemptsReset.run({ topicId, forever: EPOCH_FOREVER });
635 if (result.changes != 1) {
636 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
637 }
638 result = this.statement.topicContentFetchDone.run({ topicId });
639 if (result.changes != 1) {
640 throw new DBErrors.UnexpectedResult('did not release topic fetch');
641 }
642 })();
643 return this._engineInfo(result);
644 } catch (e) {
645 this.logger.error(_scope, 'failed', { error: e, result, topicId });
646 throw e;
647 }
648 }
649
650
651 topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
652 const _scope = _fileScope('topicFetchIncomplete');
653 this.logger.debug(_scope, 'called', { topicId });
654
655 let result;
656 try {
657 this.db.transaction(() => {
658 const { contentFetchAttemptsSinceSuccess: currentAttempt } = this.statement.topicAttempts.get({ topicId });
659 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
660 result = this.statement.topicAttemptsIncrement.run({ topicId, nextAttemptDelaySeconds });
661 if (result.changes != 1) {
662 throw new DBErrors.UnexpectedResult('did not set topic attempts');
663 }
664 result = this.statement.topicContentFetchDone.run({ topicId });
665 if (result.changes != 1) {
666 throw new DBErrors.UnexpectedResult('did not release topic fetch');
667 }
668 return result;
669 })();
670 return this._engineInfo(result);
671 } catch (e) {
672 this.logger.error(_scope, 'failed', { error: e, result, topicId });
673 throw e;
674 }
675 }
676
677
678 topicFetchRequested(dbCtx, topicId) {
679 const _scope = _fileScope('topicFetchRequested');
680 this.logger.debug(_scope, 'called', { topicId });
681
682 let result;
683 try {
684 result = this.statement.topicContentFetchRequested.run({ topicId });
685 if (result.changes != 1) {
686 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
687 }
688 return this._engineInfo(result);
689 } catch (e) {
690 this.logger.error(_scope, 'failed', { error: e, topicId });
691 throw e;
692 }
693 }
694
695
696 /**
697 * Converts engine topic fields to native types.
698 * @param {Object} data
699 */
700 static _topicDataToNative(data) {
701 const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
702 if (data) {
703 data.isActive = !!data.isActive;
704 data.isDeleted = !!data.isDeleted;
705 ['created', 'lastPublish', 'contentFetchNextAttempt', 'contentUpdated'].forEach((field) => {
706 // eslint-disable-next-line security/detect-object-injection
707 data[field] = epochToDate(data[field]);
708 });
709 }
710 return data;
711 }
712
713
714 // eslint-disable-next-line no-unused-vars
715 topicGetAll(dbCtx) {
716 const _scope = _fileScope('topicGetAll');
717 this.logger.debug(_scope, 'called');
718
719 let topics;
720 try {
721 topics = this.statement.topicGetInfoAll.all();
722 } catch (e) {
723 this.logger.error(_scope, 'failed', { error: e, topics });
724 throw e;
725 }
726 if (topics) {
727 topics = topics
728 .map(DatabaseSQLite._topicDataToNative)
729 .map(this._topicDefaults.bind(this));
730 }
731 return topics;
732 }
733
734
735 topicGetById(dbCtx, topicId, applyDefaults = true) {
736 const _scope = _fileScope('topicGetById');
737 this.logger.debug(_scope, 'called', { topicId });
738
739 let topic;
740 try {
741 topic = this.statement.topicGetById.get({ topicId });
742 DatabaseSQLite._topicDataToNative(topic);
743 if (applyDefaults) {
744 topic = this._topicDefaults(topic);
745 }
746 return topic;
747 } catch (e) {
748 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
749 throw e;
750 }
751 }
752
753
754 topicGetByUrl(dbCtx, topicUrl) {
755 const _scope = _fileScope('topicGetByUrl');
756 this.logger.debug(_scope, 'called', { topicUrl });
757
758 let topic;
759 try {
760 topic = this.statement.topicGetByUrl.get({ topicUrl });
761 DatabaseSQLite._topicDataToNative(topic);
762 return this._topicDefaults(topic);
763 } catch (e) {
764 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
765 throw e;
766 }
767 }
768
769
770 topicGetContentById(dbCtx, topicId) {
771 const _scope = _fileScope('topicGetContentById');
772 this.logger.debug(_scope, 'called', { topicId });
773
774 let topic;
775 try {
776 topic = this.statement.topicGetContentById.get({ topicId });
777 DatabaseSQLite._topicDataToNative(topic);
778 return this._topicDefaults(topic);
779 } catch (e) {
780 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
781 throw e;
782 }
783 }
784
785
786 topicPendingDelete(dbCtx, topicId) {
787 const _scope = _fileScope('topicPendingDelete');
788 this.logger.debug(_scope, 'called', { topicId });
789
790 try {
791 this.db.transaction(() => {
792 const topic = this.statement.topicGetById.get({ topicId });
793 if (!topic.isDeleted) {
794 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
795 return;
796 }
797
798 const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
799 if (subscriberCount) {
800 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
801 return;
802 }
803
804 const result = this.statement.topicDeleteById.run({ topicId });
805 if (result.changes !== 1) {
806 throw new DBErrors.UnexpectedResult('did not delete topic');
807 }
808 })();
809 this.logger.debug(_scope, 'success', { topicId });
810 } catch (e) {
811 this.logger.error(_scope, 'failed', { error: e, topicId });
812 throw e;
813 }
814 }
815
816
817 topicSet(dbCtx, data) {
818 const _scope = _fileScope('topicSet');
819 this.logger.debug(_scope, 'called', data);
820
821 const topicSetData = {
822 publisherValidationUrl: null,
823 leaseSecondsPreferred: null,
824 leaseSecondsMin: null,
825 leaseSecondsMax: null,
826 ...data,
827 };
828
829 let result;
830 try {
831 this._topicSetDataValidate(topicSetData);
832 result = this.statement.topicUpsert.run(topicSetData);
833 if (result.changes != 1) {
834 throw new DBErrors.UnexpectedResult('did not set topic data');
835 }
836 return this._engineInfo(result);
837 } catch (e) {
838 this.logger.error(_scope, 'failed', { error: e, result });
839 throw e;
840 }
841 }
842
843
844 topicSetContent(dbCtx, data) {
845 const _scope = _fileScope('topicSetContent');
846 const topicSetContentData = {
847 contentType: null,
848 ...data,
849 };
850 const logData = {
851 ...topicSetContentData,
852 content: common.logTruncate(topicSetContentData.content, 100),
853 };
854 this.logger.debug(_scope, 'called', logData);
855
856 let result;
857 try {
858 this._topicSetContentDataValidate(topicSetContentData);
859 result = this.statement.topicSetContent.run(topicSetContentData);
860 logData.result = result;
861 if (result.changes != 1) {
862 throw new DBErrors.UnexpectedResult('did not set topic content');
863 }
864 return this._engineInfo(result);
865 } catch (e) {
866 this.logger.error(_scope, 'failed', { error: e, ...logData });
867 throw e;
868 }
869 }
870
871
872 topicUpdate(dbCtx, data) {
873 const _scope = _fileScope('topicUpdate');
874 this.logger.debug(_scope, 'called', { data });
875
876 const topicData = {
877 leaseSecondsPreferred: null,
878 leaseSecondsMin: null,
879 leaseSecondsMax: null,
880 publisherValidationUrl: null,
881 ...data,
882 };
883
884 this._topicUpdateDataValidate(topicData);
885
886 try {
887 const result = this.statement.topicUpdate.run(topicData);
888 if (result.changes != 1) {
889 throw new DBErrors.UnexpectedResult('did not update topic');
890 }
891 } catch (e) {
892 this.logger.error(_scope, 'failed', { error: e, topicData });
893 throw e;
894 }
895 }
896
897
898 verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
899 const _scope = _fileScope('verificationClaim');
900 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
901
902 let verificationIds;
903 try {
904 this.db.transaction(() => {
905 verificationIds = this.statement.verificationNeeded.all({ wanted }).map((claim) => claim.id);
906 verificationIds.forEach((verificationId) => {
907 const result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
908 if (result.changes != 1) {
909 throw new DBErrors.UnexpectedResult('did not claim verification');
910 }
911 });
912 })();
913 return verificationIds;
914 } catch (e) {
915 this.logger.error(_scope, 'failed to claim verifications', { wanted, claimTimeoutSeconds });
916 throw e;
917 }
918 }
919
920
921 verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
922 const _scope = _fileScope('verificationClaimById');
923 this.logger.debug(_scope, 'called', { verificationId, claimTimeoutSeconds, claimant });
924
925 let result;
926 try {
927 result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
928 if (result.changes != 1) {
929 throw new DBErrors.UnexpectedResult('did not claim verification');
930 }
931 return this._engineInfo(result);
932 } catch (e) {
933 this.logger.error(_scope, 'failed to claim verification', { error: e, verificationId, claimTimeoutSeconds, claimant });
934 throw e;
935 }
936 }
937
938
939 verificationComplete(dbCtx, verificationId, callback, topicId) {
940 const _scope = _fileScope('verificationComplete');
941 this.logger.debug(_scope, 'called', { verificationId });
942
943 let result;
944 try {
945 this.db.transaction(() => {
946 result = this.statement.verificationScrub.run({ verificationId, callback, topicId });
947 if (result.changes < 1) {
948 throw new DBErrors.UnexpectedResult('did not remove verifications');
949 }
950 })();
951 } catch (e) {
952 this.logger.error(_scope, 'failed', { verificationId });
953 throw e;
954 }
955 return this._engineInfo(result);
956 }
957
958
959 /**
960 * Converts engine verification fields to native types.
961 * @param {Object} data
962 */
963 static _verificationDataToNative(data) {
964 if (data) {
965 data.isPublisherValidated = !!data.isPublisherValidated;
966 }
967 }
968
969
970 verificationGetById(dbCtx, verificationId) {
971 const _scope = _fileScope('verificationGetById');
972 this.logger.debug(_scope, 'called', { verificationId });
973
974 let verification;
975 try {
976 verification = this.statement.verificationGetById.get({ verificationId });
977 DatabaseSQLite._verificationDataToNative(verification);
978 return verification;
979 } catch (e) {
980 this.logger.error(_scope, 'failed', { error: e, verificationId });
981 throw e;
982 }
983 }
984
985
986 verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
987 const _scope = _fileScope('verificationIncomplete');
988 this.logger.debug(_scope, 'called', { verificationId });
989
990 let result;
991 try {
992 this.db.transaction(() => {
993 const { attempts: currentAttempt } = this.statement.verificationAttempts.get({ verificationId });
994 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
995 result = this.statement.verificationAttemptsIncrement.run({ verificationId, nextAttemptDelaySeconds });
996 if (result.changes != 1) {
997 throw new DBErrors.UnexpectedResult('did not increment verification attempts');
998 }
999 result = this.statement.verificationDone.run({ verificationId });
1000 if (result.changes != 1) {
1001 throw new DBErrors.UnexpectedResult('did not release verification in progress');
1002 }
1003 return result;
1004 })();
1005 return this._engineInfo(result);
1006 } catch (e) {
1007 this.logger.error(_scope, 'failed', { error: e, result, verificationId });
1008 throw e;
1009 }
1010 }
1011
1012
1013 /**
1014 * Convert native verification fields to engine types.
1015 */
1016 static _verificationDataToEngine(data) {
1017 if (data) {
1018 data.isPublisherValidated = data.isPublisherValidated ? 1 : 0;
1019 }
1020 }
1021
1022
1023 verificationInsert(dbCtx, verification) {
1024 const _scope = _fileScope('verificationInsert');
1025 this.logger.debug(_scope, 'called', { verification });
1026
1027 const verificationData = {
1028 secret: null,
1029 httpRemoteAddr: null,
1030 httpFrom: null,
1031 requestId: null,
1032 ...verification,
1033 };
1034
1035 let result, verificationId;
1036 try {
1037 this._verificationDataValidate(verificationData);
1038 DatabaseSQLite._verificationDataToEngine(verificationData);
1039 result = this.statement.verificationInsert.run(verificationData);
1040 if (result.changes != 1) {
1041 throw new DBErrors.UnexpectedResult('did not insert verification');
1042 }
1043 verificationId = result.lastInsertRowid;
1044 this.logger.debug(_scope, 'inserted verification', { verificationId });
1045
1046 return verificationId;
1047 } catch (e) {
1048 this.logger.error(_scope, 'failed', { error: e, verificationData });
1049 throw e;
1050 }
1051 }
1052
1053
1054 verificationRelease(dbCtx, verificationId) {
1055 const _scope = _fileScope('verificationRelease');
1056 this.logger.debug(_scope, 'called', { verificationId });
1057
1058 let result;
1059 try {
1060 result = this.statement.verificationDone.run({ verificationId });
1061 if (result.changes != 1) {
1062 throw new DBErrors.UnexpectedResult('did not release verification');
1063 }
1064 } catch (e) {
1065 this.logger.error(_scope, 'failed', { error: e, verificationId });
1066 throw e;
1067 }
1068 }
1069
1070
1071 verificationUpdate(dbCtx, verificationId, data) {
1072 const _scope = _fileScope('verificationUpdate');
1073 this.logger.debug(_scope, 'called', { verificationId, data });
1074
1075 const verificationData = {
1076 reason: null,
1077 verificationId,
1078 ...data,
1079 };
1080
1081 let result;
1082 try {
1083 this._verificationUpdateDataValidate(verificationData);
1084 DatabaseSQLite._verificationDataToEngine(verificationData);
1085 result = this.statement.verificationUpdate.run(verificationData);
1086 if (result.changes != 1) {
1087 throw new DBErrors.UnexpectedResult('did not update verification');
1088 }
1089 } catch (e) {
1090 this.logger.error(_scope, 'failed', { error: e, verificationData });
1091 throw e;
1092 }
1093 }
1094
1095
1096 verificationValidated(dbCtx, verificationId) {
1097 const _scope = _fileScope('verificationValidated');
1098 this.logger.debug(_scope, 'called', { verificationId });
1099
1100 let result;
1101 try {
1102 result = this.statement.verificationValidate.run({ verificationId });
1103 if (result.changes != 1) {
1104 throw new DBErrors.UnexpectedResult('did not set verification validation');
1105 }
1106 } catch (e) {
1107 this.logger.error(_scope, 'failed', { error: e, verificationId });
1108 throw e;
1109 }
1110 }
1111
1112
1113 }
1114
1115 module.exports = DatabaseSQLite;