fix topic update integration test, topicGetByUrl now optionally applies defaults
[websub-hub] / src / db / sqlite / index.js
1 'use strict';
2
3 const common = require('../../common');
4 const Database = require('../base');
5 const DBErrors = require('../errors');
6 const svh = require('../schema-version-helper');
7 const SQLite = require('better-sqlite3');
8 const fs = require('fs');
9 const path = require('path');
10 const { performance } = require('perf_hooks');
11
12 const _fileScope = common.fileScope(__filename);
13
14 const schemaVersionsSupported = {
15 min: {
16 major: 1,
17 minor: 0,
18 patch: 0,
19 },
20 max: {
21 major: 1,
22 minor: 0,
23 patch: 4,
24 },
25 };
26
27 // max of signed int64 (2^63 - 1), should be enough
28 const EPOCH_FOREVER = BigInt('9223372036854775807');
29 const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
30 const dateToEpoch = (date) => Math.round(date.getTime() / 1000);
31
32 class DatabaseSQLite extends Database {
33 constructor(logger, options) {
34 super(logger, options);
35
36 const connectionString = options.db.connectionString || 'sqlite://:memory:';
37 const csDelim = '://';
38 const dbFilename = connectionString.slice(connectionString.indexOf(csDelim) + csDelim.length);
39
40 const queryLogLevel = options.db.queryLogLevel;
41
42 const sqliteOptions = {
43 ...(queryLogLevel && {
44 // eslint-disable-next-line security/detect-object-injection
45 verbose: (query) => this.logger[queryLogLevel](_fileScope('SQLite:verbose'), '', { query }),
46 }),
47 };
48 this.db = new SQLite(dbFilename, sqliteOptions);
49 this.schemaVersionsSupported = schemaVersionsSupported;
50 this.changesSinceLastOptimize = BigInt(0);
51 this.optimizeAfterChanges = options.db.connectionString.optimizeAfterChanges;
52 this.db.pragma('foreign_keys = on'); // Enforce consistency.
53 this.db.pragma('journal_mode = WAL'); // Be faster, expect local filesystem.
54 this.db.defaultSafeIntegers(true); // This probably isn't necessary, but by using these BigInts we keep weird floats out of the query logs.
55
56 this._initTables();
57 this._initStatements();
58 }
59
60
61 /**
62 * SQLite cannot prepare its statements without a schema, ensure such exists.
63 */
64 _initTables() {
65 const _scope = _fileScope('_initTables');
66
67 // Migrations rely upon this table, ensure it exists.
68 const metaVersionTable = '_meta_schema_version';
69 const tableExists = this.db.prepare('SELECT name FROM sqlite_master WHERE type=:type AND name=:name').pluck(true).bind({ type: 'table', name: metaVersionTable });
70 let metaExists = tableExists.get();
71 if (metaExists === undefined) {
72 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
73 // eslint-disable-next-line security/detect-non-literal-fs-filename
74 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
75 this.db.exec(fSql);
76 metaExists = tableExists.get();
77 /* istanbul ignore if */
78 if (metaExists === undefined) {
79 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
80 }
81 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
82 }
83
84 // Apply migrations
85 const currentSchema = this._currentSchema();
86 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
87 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
88 migrationsWanted.forEach((v) => {
89 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
90 try {
91 // eslint-disable-next-line security/detect-non-literal-fs-filename
92 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
93 this.logger.debug(_scope, 'applying migration', { version: v });
94 const results = this.db.exec(fSql);
95 this.logger.debug(_scope, 'migration results', { results });
96 this.logger.info(_scope, 'applied migration', { version: v });
97 } catch (e) {
98 this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
99 throw e;
100 }
101 });
102 }
103
104
105 _initStatements() {
106 const _scope = _fileScope('_initStatements');
107 const sqlDir = path.join(__dirname, 'sql');
108 this.statement = {};
109
110 // Decorate the statement calls we use with timing and logging.
111 const wrapFetch = (logName, statementName, fn) => {
112 const _wrapScope = _fileScope(logName);
113 return (...args) => {
114 const startTimestampMs = performance.now();
115 const rows = fn(...args);
116 DatabaseSQLite._deOphidiate(rows);
117 const elapsedTimeMs = performance.now() - startTimestampMs;
118 this.logger.debug(_wrapScope, 'complete', { statementName, elapsedTimeMs });
119 return rows;
120 };
121 };
122 const wrapRun = (logName, statementName, fn) => {
123 const _wrapScope = _fileScope(logName);
124 return (...args) => {
125 const startTimestampMs = performance.now();
126 const result = fn(...args);
127 const elapsedTimeMs = performance.now() - startTimestampMs;
128 this.logger.debug(_wrapScope, 'complete', { ...result, statementName, elapsedTimeMs });
129 result.duration = elapsedTimeMs;
130 return result;
131 };
132 };
133
134 // eslint-disable-next-line security/detect-non-literal-fs-filename
135 for (const f of fs.readdirSync(sqlDir)) {
136 const fPath = path.join(sqlDir, f);
137 const { name: fName, ext: fExt } = path.parse(f);
138 // eslint-disable-next-line security/detect-non-literal-fs-filename
139 const stat = fs.statSync(fPath);
140 if (!stat.isFile()
141 || fExt.toLowerCase() !== '.sql') {
142 continue;
143 }
144 // eslint-disable-next-line security/detect-non-literal-fs-filename
145 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
146 const statementName = Database._camelfy(fName.toLowerCase(), '-');
147 let statement;
148 try {
149 statement = this.db.prepare(fSql);
150 } catch (e) {
151 /* istanbul ignore next */
152 this.logger.error(_scope, 'failed to prepare statement', { error: e, file: f });
153 /* istanbul ignore next */
154 throw e;
155 }
156 // eslint-disable-next-line security/detect-object-injection
157 this.statement[statementName] = statement;
158 const { get: origGet, all: origAll, run: origRun } = statement;
159 statement.get = wrapFetch('SQLite:get', statementName, origGet.bind(statement));
160 statement.all = wrapFetch('SQLite:all', statementName, origAll.bind(statement));
161 statement.run = wrapRun('SQLite:run', statementName, origRun.bind(statement));
162 }
163 this.statement._optimize = this.db.prepare('SELECT * FROM pragma_optimize(0x03)');
164
165 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
166 }
167
168
169 static _deOphidiate(rows) {
170 const rowsIsArray = Array.isArray(rows);
171 if (!rowsIsArray) {
172 rows = [rows];
173 }
174 const exemplaryRow = rows[0];
175 for (const prop in exemplaryRow) {
176 const camel = Database._camelfy(prop);
177 if (!(camel in exemplaryRow)) {
178 for (const d of rows) {
179 // eslint-disable-next-line security/detect-object-injection
180 d[camel] = d[prop];
181 // eslint-disable-next-line security/detect-object-injection
182 delete d[prop];
183 }
184 }
185 }
186 return rowsIsArray ? rows : rows[0];
187 }
188
189
190 _currentSchema() {
191 return this.db.prepare('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1').get();
192 }
193
194
195 healthCheck() {
196 const _scope = _fileScope('healthCheck');
197 this.logger.debug(_scope, 'called', {});
198 if (!this.db.open) {
199 throw new DBErrors.UnexpectedResult('database is not open');
200 }
201 return { open: this.db.open };
202 }
203
204
205 _engineInfo(result) {
206 if (result.changes) {
207 this.changesSinceLastOptimize += BigInt(result.changes);
208 this._optimize();
209 }
210 return {
211 changes: Number(result.changes),
212 lastInsertRowid: result.lastInsertRowid,
213 };
214 }
215
216
217 _closeConnection() {
218 this.db.close();
219 }
220
221
222 _optimize() {
223 const _scope = _fileScope('_optimize');
224
225 if (this.optimizeAfterChanges
226 && this.changesSinceLastOptimize >= this.optimizeAfterChanges) {
227 const optimize = this.statement._optimize.all();
228 this.logger.debug(_scope, 'optimize', { optimize });
229 this.db.pragma('optimize');
230 this.changesSinceLastOptimize = BigInt(0);
231 }
232 }
233
234
235 _purgeTables(really) {
236 if (really) {
237 [
238 'topic',
239 'topic_fetch_in_progress',
240 'verification',
241 'verification_in_progress',
242 'subscription',
243 'subscription_delivery_in_progress',
244 ].map((table) => {
245 const result = this.db.prepare(`DELETE FROM ${table}`).run();
246 this.logger.debug(_fileScope('_purgeTables'), 'success', { table, result });
247 });
248 }
249 }
250
251
252 context(fn) {
253 return fn(this.db);
254 }
255
256
257 transaction(dbCtx, fn) {
258 dbCtx = dbCtx || this.db;
259 return dbCtx.transaction(fn)();
260 }
261
262
263 authenticationSuccess(dbCtx, identifier) {
264 const _scope = _fileScope('authenticationSuccess');
265 this.logger.debug(_scope, 'called', { identifier });
266
267 let result;
268 try {
269 result = this.statement.authenticationSuccess.run({ identifier });
270 if (result.changes != 1) {
271 throw new DBErrors.UnexpectedResult('did not update authentication success');
272 }
273 } catch (e) {
274 this.logger.error(_scope, 'failed', { error: e, identifier });
275 throw e;
276 }
277 }
278
279
280 authenticationGet(dbCtx, identifier) {
281 const _scope = _fileScope('authenticationGet');
282 this.logger.debug(_scope, 'called', { identifier });
283
284 try {
285 return this.statement.authenticationGet.get({ identifier });
286 } catch (e) {
287 this.logger.error(_scope, 'failed', { error: e, identifier });
288 throw e;
289 }
290 }
291
292
293 authenticationUpsert(dbCtx, identifier, credential) {
294 const _scope = _fileScope('authenticationUpsert');
295 const scrubbedCredential = '*'.repeat((credential || '').length);
296 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
297
298 let result;
299 try {
300 result = this.statement.authenticationUpsert.run({ identifier, credential });
301 if (result.changes != 1) {
302 throw new DBErrors.UnexpectedResult('did not upsert authentication');
303 }
304 } catch (e) {
305 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
306 throw e;
307 }
308 }
309
310
311 /**
312 * Converts engine subscription fields to native types.
313 * @param {Object} data
314 */
315 static _subscriptionDataToNative(data) {
316 if (data) {
317 ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
318 // eslint-disable-next-line security/detect-object-injection
319 data[field] = epochToDate(data[field]);
320 });
321 }
322 return data;
323 }
324
325
326 subscriptionsByTopicId(dbCtx, topicId) {
327 const _scope = _fileScope('subscriptionsByTopicId');
328 this.logger.debug(_scope, 'called', { topicId });
329
330 try {
331 const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
332 return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
333 } catch (e) {
334 this.logger.error(_scope, 'failed', { error: e, topicId });
335 throw e;
336 }
337 }
338
339
340 subscriptionCountByTopicUrl(dbCtx, topicUrl) {
341 const _scope = _fileScope('subscriptionCountByTopicUrl');
342 this.logger.debug(_scope, 'called', { topicUrl });
343
344 try {
345 return this.statement.subscriptionCountByTopicUrl.get({ topicUrl });
346 } catch (e) {
347 this.logger.error(_scope, 'failed', { error: e, topicUrl });
348 throw e;
349 }
350 }
351
352
353 subscriptionDelete(dbCtx, callback, topicId) {
354 const _scope = _fileScope('subscriptionDelete');
355 this.logger.debug(_scope, 'called', { callback, topicId });
356
357 try {
358 const result = this.statement.subscriptionDelete.run({ callback, topicId });
359 if (result.changes != 1) {
360 throw new DBErrors.UnexpectedResult('did not delete subscription');
361 }
362 return this._engineInfo(result);
363 } catch (e) {
364 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
365 throw e;
366 }
367 }
368
369
370 subscriptionDeleteExpired(dbCtx, topicId) {
371 const _scope = _fileScope('subscriptionDeleteExpired');
372 this.logger.debug(_scope, 'called', { topicId });
373
374 try {
375 const result = this.statement.subscriptionDeleteExpired.run({ topicId });
376 this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
377 return this._engineInfo(result);
378 } catch (e) {
379 this.logger.error(_scope, 'failed', { error: e, topicId });
380 throw e;
381 }
382 }
383
384
385 subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
386 const _scope = _fileScope('subscriptionDeliveryClaim');
387 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
388
389 let subscriptionIds;
390 try {
391 this.db.transaction(() => {
392 subscriptionIds = this.statement.subscriptionDeliveryNeeded.all({ wanted }).map((claim) => claim.id);
393 subscriptionIds.forEach((subscriptionId) => {
394 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
395 if (result.changes != 1) {
396 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
397 }
398 });
399 })();
400 return subscriptionIds;
401 } catch (e) {
402 this.logger.error(_scope, 'failed', { error: e, wanted, claimTimeoutSeconds, claimant, subscriptionIds });
403 throw e;
404 }
405 }
406
407
408 subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
409 const _scope = _fileScope('subscriptionDeliveryClaimById');
410 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
411
412 try {
413 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
414 if (result.changes != 1) {
415 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
416 }
417 return this._engineInfo(result);
418 } catch (e) {
419 this.logger.error(_scope, 'failed', { error: e, subscriptionId, claimTimeoutSeconds, claimant });
420 throw e;
421 }
422 }
423
424
425 subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
426 const _scope = _fileScope('subscriptionDeliveryComplete');
427 this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
428
429 let result;
430 try {
431 this.db.transaction(() => {
432 topicContentUpdated = dateToEpoch(topicContentUpdated);
433 result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId, topicContentUpdated });
434 if (result.changes != 1) {
435 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
436 }
437 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
438 if (result.changes != 1) {
439 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
440 }
441 })();
442 return this._engineInfo(result);
443 } catch (e) {
444 this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
445 throw e;
446 }
447 }
448
449
450 subscriptionDeliveryGone(dbCtx, callback, topicId) {
451 const _scope = _fileScope('subscriptionDeliveryGone');
452 this.logger.debug(_scope, 'called', { callback, topicId });
453
454 let result;
455 try {
456 this.db.transaction(() => {
457 result = this.statement.subscriptionDelete.run({ callback, topicId });
458 if (result.changes != 1) {
459 throw new DBErrors.UnexpectedResult('did not delete subscription');
460 }
461 // Delete cascades to delivery
462 // result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
463 // if (result.changes != 1) {
464 // throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
465 // }
466 })();
467 return this._engineInfo(result);
468 } catch (e) {
469 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
470 throw e;
471 }
472 }
473
474
475 subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
476 const _scope = _fileScope('subscriptionDeliveryIncomplete');
477 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
478
479 let result;
480 try {
481 this.db.transaction(() => {
482 const { currentAttempt } = this.statement.subscriptionDeliveryAttempts.get({ callback, topicId });
483 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
484 result = this.statement.subscriptionDeliveryFailure.run({ nextAttemptDelaySeconds, callback, topicId });
485 if (result.changes != 1) {
486 throw new DBErrors.UnexpectedResult('did not set delivery failure');
487 }
488 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
489 if (result.changes != 1) {
490 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
491 }
492 })();
493 } catch (e) {
494 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
495 throw e;
496 }
497 }
498
499
500 subscriptionGet(dbCtx, callback, topicId) {
501 const _scope = _fileScope('subscriptionGet');
502 this.logger.debug(_scope, 'called', { callback, topicId });
503
504 let subscription;
505 try {
506 subscription = this.statement.subscriptionGet.get({ callback, topicId });
507 return DatabaseSQLite._subscriptionDataToNative(subscription);
508 } catch (e) {
509 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
510 throw e;
511 }
512 }
513
514
515 subscriptionGetById(dbCtx, subscriptionId) {
516 const _scope = _fileScope('subscriptionGetById');
517 this.logger.debug(_scope, 'called', { subscriptionId });
518
519 let subscription;
520 try {
521 subscription = this.statement.subscriptionGetById.get({ subscriptionId });
522 return DatabaseSQLite._subscriptionDataToNative(subscription);
523 } catch (e) {
524 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
525 throw e;
526 }
527 }
528
529
530 subscriptionUpdate(dbCtx, data) {
531 const _scope = _fileScope('subscriptionUpdate');
532 this.logger.debug(_scope, 'called', { data });
533
534 const subscriptionData = {
535 ...data,
536 };
537
538 this._subscriptionUpdateDataValidate(subscriptionData);
539
540 try {
541 const result = this.statement.subscriptionUpdate.run(subscriptionData);
542 if (result.changes != 1) {
543 throw new DBErrors.UnexpectedResult('did not update subscription');
544 }
545 } catch (e) {
546 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
547 throw e;
548 }
549 }
550
551
552 subscriptionUpsert(dbCtx, data) {
553 const _scope = _fileScope('subscriptionUpsert');
554 this.logger.debug(_scope, 'called', { ...data });
555
556 const subscriptionData = {
557 secret: null,
558 httpRemoteAddr: null,
559 httpFrom: null,
560 ...data,
561 }
562 this._subscriptionUpsertDataValidate(subscriptionData);
563
564 let result;
565 try {
566 result = this.statement.subscriptionUpsert.run(subscriptionData);
567 if (result.changes != 1) {
568 throw new DBErrors.UnexpectedResult('did not upsert subscription');
569 }
570 return this._engineInfo(result);
571 } catch (e) {
572 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
573 throw e;
574 }
575 }
576
577
578 topicDeleted(dbCtx, topicId) {
579 const _scope = _fileScope('topicDeleted');
580 this.logger.debug(_scope, 'called', { topicId });
581
582 let result;
583 try {
584 result = this.statement.topicDeleted.run({ topicId });
585 if (result.changes != 1) {
586 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
587 }
588 } catch (e) {
589 this.logger.error(_scope, 'failed', { error: e, topicId });
590 throw e;
591 }
592 }
593
594
595 topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
596 const _scope = _fileScope('topicFetchClaim');
597 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
598
599 let topicIds;
600 try {
601 this.db.transaction(() => {
602 topicIds = this.statement.topicContentFetchNeeded.all({ wanted }).map((claim) => claim.id);
603 topicIds.forEach((topicId) => {
604 const result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
605 if (result.changes != 1) {
606 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
607 }
608 });
609 })();
610 return topicIds;
611 } catch (e) {
612 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, wanted, claimTimeoutSeconds, claimant, topicIds });
613 throw e;
614 }
615 }
616
617
618 topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
619 const _scope = _fileScope('topicFetchClaimById');
620 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
621
622 let result;
623 try {
624 result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
625 if (result.changes != 1) {
626 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
627 }
628 return this._engineInfo(result);
629 } catch (e) {
630 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, topicId, claimTimeoutSeconds, claimant });
631 throw e;
632 }
633 }
634
635
636 topicFetchComplete(dbCtx, topicId) {
637 const _scope = _fileScope('topicFetchComplete');
638 this.logger.debug(_scope, 'called', { topicId });
639
640 let result;
641 try {
642 this.db.transaction(() => {
643 result = this.statement.topicAttemptsReset.run({ topicId, forever: EPOCH_FOREVER });
644 if (result.changes != 1) {
645 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
646 }
647 result = this.statement.topicContentFetchDone.run({ topicId });
648 if (result.changes != 1) {
649 throw new DBErrors.UnexpectedResult('did not release topic fetch');
650 }
651 })();
652 return this._engineInfo(result);
653 } catch (e) {
654 this.logger.error(_scope, 'failed', { error: e, result, topicId });
655 throw e;
656 }
657 }
658
659
660 topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
661 const _scope = _fileScope('topicFetchIncomplete');
662 this.logger.debug(_scope, 'called', { topicId });
663
664 let result;
665 try {
666 this.db.transaction(() => {
667 const { contentFetchAttemptsSinceSuccess: currentAttempt } = this.statement.topicAttempts.get({ topicId });
668 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
669 result = this.statement.topicAttemptsIncrement.run({ topicId, nextAttemptDelaySeconds });
670 if (result.changes != 1) {
671 throw new DBErrors.UnexpectedResult('did not set topic attempts');
672 }
673 result = this.statement.topicContentFetchDone.run({ topicId });
674 if (result.changes != 1) {
675 throw new DBErrors.UnexpectedResult('did not release topic fetch');
676 }
677 return result;
678 })();
679 return this._engineInfo(result);
680 } catch (e) {
681 this.logger.error(_scope, 'failed', { error: e, result, topicId });
682 throw e;
683 }
684 }
685
686
687 topicFetchRequested(dbCtx, topicId) {
688 const _scope = _fileScope('topicFetchRequested');
689 this.logger.debug(_scope, 'called', { topicId });
690
691 let result;
692 try {
693 result = this.statement.topicContentFetchRequested.run({ topicId });
694 if (result.changes != 1) {
695 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
696 }
697 return this._engineInfo(result);
698 } catch (e) {
699 this.logger.error(_scope, 'failed', { error: e, topicId });
700 throw e;
701 }
702 }
703
704
705 /**
706 * Converts engine topic fields to native types.
707 * @param {Object} data
708 */
709 static _topicDataToNative(data) {
710 if (data) {
711 data.isActive = !!data.isActive;
712 data.isDeleted = !!data.isDeleted;
713 ['created', 'lastPublish', 'contentFetchNextAttempt', 'contentUpdated'].forEach((field) => {
714 // eslint-disable-next-line security/detect-object-injection
715 data[field] = epochToDate(data[field]);
716 });
717 }
718 return data;
719 }
720
721
722 // eslint-disable-next-line no-unused-vars
723 topicGetAll(dbCtx) {
724 const _scope = _fileScope('topicGetAll');
725 this.logger.debug(_scope, 'called');
726
727 let topics;
728 try {
729 topics = this.statement.topicGetInfoAll.all();
730 } catch (e) {
731 this.logger.error(_scope, 'failed', { error: e, topics });
732 throw e;
733 }
734 if (topics) {
735 topics = topics
736 .map(DatabaseSQLite._topicDataToNative)
737 .map(this._topicDefaults.bind(this));
738 }
739 return topics;
740 }
741
742
743 topicGetById(dbCtx, topicId, applyDefaults = true) {
744 const _scope = _fileScope('topicGetById');
745 this.logger.debug(_scope, 'called', { topicId });
746
747 let topic;
748 try {
749 topic = this.statement.topicGetById.get({ topicId });
750 DatabaseSQLite._topicDataToNative(topic);
751 if (applyDefaults) {
752 topic = this._topicDefaults(topic);
753 }
754 return topic;
755 } catch (e) {
756 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
757 throw e;
758 }
759 }
760
761
762 topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
763 const _scope = _fileScope('topicGetByUrl');
764 this.logger.debug(_scope, 'called', { topicUrl });
765
766 let topic;
767 try {
768 topic = this.statement.topicGetByUrl.get({ topicUrl });
769 DatabaseSQLite._topicDataToNative(topic);
770 if (applyDefaults) {
771 topic = this._topicDefaults(topic);
772 }
773 return topic;
774 } catch (e) {
775 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
776 throw e;
777 }
778 }
779
780
781 topicGetContentById(dbCtx, topicId) {
782 const _scope = _fileScope('topicGetContentById');
783 this.logger.debug(_scope, 'called', { topicId });
784
785 let topic;
786 try {
787 topic = this.statement.topicGetContentById.get({ topicId });
788 DatabaseSQLite._topicDataToNative(topic);
789 return this._topicDefaults(topic);
790 } catch (e) {
791 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
792 throw e;
793 }
794 }
795
796
797 topicPendingDelete(dbCtx, topicId) {
798 const _scope = _fileScope('topicPendingDelete');
799 this.logger.debug(_scope, 'called', { topicId });
800
801 try {
802 this.db.transaction(() => {
803 const topic = this.statement.topicGetById.get({ topicId });
804 if (!topic.isDeleted) {
805 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
806 return;
807 }
808
809 const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
810 if (subscriberCount) {
811 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
812 return;
813 }
814
815 const result = this.statement.topicDeleteById.run({ topicId });
816 if (result.changes !== 1) {
817 throw new DBErrors.UnexpectedResult('did not delete topic');
818 }
819 })();
820 this.logger.debug(_scope, 'success', { topicId });
821 } catch (e) {
822 this.logger.error(_scope, 'failed', { error: e, topicId });
823 throw e;
824 }
825 }
826
827
828 topicPublishHistory(dbCtx, topicId, days) {
829 const _scope = _fileScope('topicPublishHistory');
830 this.logger.debug(_scope, 'called', { topicId, days })
831
832 const events = this.statement.topicPublishHistory.all({ topicId, daysAgo: days });
833 const history = Array.from({ length: days }, () => 0);
834 events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
835
836 return history;
837 }
838
839
840 topicSet(dbCtx, data) {
841 const _scope = _fileScope('topicSet');
842 this.logger.debug(_scope, 'called', data);
843
844 const topicSetData = {
845 publisherValidationUrl: null,
846 leaseSecondsPreferred: null,
847 leaseSecondsMin: null,
848 leaseSecondsMax: null,
849 ...data,
850 };
851
852 let result;
853 try {
854 this._topicSetDataValidate(topicSetData);
855 result = this.statement.topicUpsert.run(topicSetData);
856 if (result.changes != 1) {
857 throw new DBErrors.UnexpectedResult('did not set topic data');
858 }
859 return this._engineInfo(result);
860 } catch (e) {
861 this.logger.error(_scope, 'failed', { error: e, result });
862 throw e;
863 }
864 }
865
866
867 topicSetContent(dbCtx, data) {
868 const _scope = _fileScope('topicSetContent');
869 const topicSetContentData = {
870 contentType: null,
871 httpETag: null,
872 httpLastModified: null,
873 ...data,
874 };
875 const logData = {
876 ...topicSetContentData,
877 content: common.logTruncate(topicSetContentData.content, 100),
878 };
879 this.logger.debug(_scope, 'called', logData);
880
881 let result;
882 try {
883 this._topicSetContentDataValidate(topicSetContentData);
884 result = this.statement.topicSetContent.run(topicSetContentData);
885 logData.result = result;
886 if (result.changes != 1) {
887 throw new DBErrors.UnexpectedResult('did not set topic content');
888 }
889 result = this.statement.topicSetContentHistory.run({
890 topicId: data.topicId,
891 contentHash: data.contentHash,
892 contentSize: data.content.length,
893 });
894 if (result.changes != 1) {
895 throw new DBErrors.UnexpectedResult('did not set topic content history');
896 }
897 return this._engineInfo(result);
898 } catch (e) {
899 this.logger.error(_scope, 'failed', { error: e, ...logData });
900 throw e;
901 }
902 }
903
904
905 topicUpdate(dbCtx, data) {
906 const _scope = _fileScope('topicUpdate');
907 this.logger.debug(_scope, 'called', { data });
908
909 const topicData = {
910 leaseSecondsPreferred: null,
911 leaseSecondsMin: null,
912 leaseSecondsMax: null,
913 publisherValidationUrl: null,
914 ...data,
915 };
916
917 this._topicUpdateDataValidate(topicData);
918
919 try {
920 const result = this.statement.topicUpdate.run(topicData);
921 if (result.changes != 1) {
922 throw new DBErrors.UnexpectedResult('did not update topic');
923 }
924 } catch (e) {
925 this.logger.error(_scope, 'failed', { error: e, topicData });
926 throw e;
927 }
928 }
929
930
931 verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
932 const _scope = _fileScope('verificationClaim');
933 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
934
935 let verificationIds;
936 try {
937 this.db.transaction(() => {
938 verificationIds = this.statement.verificationNeeded.all({ wanted }).map((claim) => claim.id);
939 verificationIds.forEach((verificationId) => {
940 const result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
941 if (result.changes != 1) {
942 throw new DBErrors.UnexpectedResult('did not claim verification');
943 }
944 });
945 })();
946 return verificationIds;
947 } catch (e) {
948 this.logger.error(_scope, 'failed to claim verifications', { wanted, claimTimeoutSeconds });
949 throw e;
950 }
951 }
952
953
954 verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
955 const _scope = _fileScope('verificationClaimById');
956 this.logger.debug(_scope, 'called', { verificationId, claimTimeoutSeconds, claimant });
957
958 let result;
959 try {
960 result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
961 if (result.changes != 1) {
962 throw new DBErrors.UnexpectedResult('did not claim verification');
963 }
964 return this._engineInfo(result);
965 } catch (e) {
966 this.logger.error(_scope, 'failed to claim verification', { error: e, verificationId, claimTimeoutSeconds, claimant });
967 throw e;
968 }
969 }
970
971
972 verificationComplete(dbCtx, verificationId, callback, topicId) {
973 const _scope = _fileScope('verificationComplete');
974 this.logger.debug(_scope, 'called', { verificationId });
975
976 let result;
977 try {
978 this.db.transaction(() => {
979 result = this.statement.verificationScrub.run({ verificationId, callback, topicId });
980 if (result.changes < 1) {
981 throw new DBErrors.UnexpectedResult('did not remove verifications');
982 }
983 })();
984 } catch (e) {
985 this.logger.error(_scope, 'failed', { verificationId });
986 throw e;
987 }
988 return this._engineInfo(result);
989 }
990
991
992 /**
993 * Converts engine verification fields to native types.
994 * @param {Object} data
995 */
996 static _verificationDataToNative(data) {
997 if (data) {
998 data.isPublisherValidated = !!data.isPublisherValidated;
999 }
1000 }
1001
1002
1003 verificationGetById(dbCtx, verificationId) {
1004 const _scope = _fileScope('verificationGetById');
1005 this.logger.debug(_scope, 'called', { verificationId });
1006
1007 let verification;
1008 try {
1009 verification = this.statement.verificationGetById.get({ verificationId });
1010 DatabaseSQLite._verificationDataToNative(verification);
1011 return verification;
1012 } catch (e) {
1013 this.logger.error(_scope, 'failed', { error: e, verificationId });
1014 throw e;
1015 }
1016 }
1017
1018
1019 verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
1020 const _scope = _fileScope('verificationIncomplete');
1021 this.logger.debug(_scope, 'called', { verificationId });
1022
1023 let result;
1024 try {
1025 this.db.transaction(() => {
1026 const { attempts: currentAttempt } = this.statement.verificationAttempts.get({ verificationId });
1027 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
1028 result = this.statement.verificationAttemptsIncrement.run({ verificationId, nextAttemptDelaySeconds });
1029 if (result.changes != 1) {
1030 throw new DBErrors.UnexpectedResult('did not increment verification attempts');
1031 }
1032 result = this.statement.verificationDone.run({ verificationId });
1033 if (result.changes != 1) {
1034 throw new DBErrors.UnexpectedResult('did not release verification in progress');
1035 }
1036 return result;
1037 })();
1038 return this._engineInfo(result);
1039 } catch (e) {
1040 this.logger.error(_scope, 'failed', { error: e, result, verificationId });
1041 throw e;
1042 }
1043 }
1044
1045
1046 /**
1047 * Convert native verification fields to engine types.
1048 */
1049 static _verificationDataToEngine(data) {
1050 if (data) {
1051 data.isPublisherValidated = data.isPublisherValidated ? 1 : 0;
1052 }
1053 }
1054
1055
1056 verificationInsert(dbCtx, verification) {
1057 const _scope = _fileScope('verificationInsert');
1058 this.logger.debug(_scope, 'called', { verification });
1059
1060 const verificationData = {
1061 secret: null,
1062 httpRemoteAddr: null,
1063 httpFrom: null,
1064 requestId: null,
1065 ...verification,
1066 };
1067
1068 let result, verificationId;
1069 try {
1070 this._verificationDataValidate(verificationData);
1071 DatabaseSQLite._verificationDataToEngine(verificationData);
1072 result = this.statement.verificationInsert.run(verificationData);
1073 if (result.changes != 1) {
1074 throw new DBErrors.UnexpectedResult('did not insert verification');
1075 }
1076 verificationId = result.lastInsertRowid;
1077 this.logger.debug(_scope, 'inserted verification', { verificationId });
1078
1079 return verificationId;
1080 } catch (e) {
1081 this.logger.error(_scope, 'failed', { error: e, verificationData });
1082 throw e;
1083 }
1084 }
1085
1086
1087 verificationRelease(dbCtx, verificationId) {
1088 const _scope = _fileScope('verificationRelease');
1089 this.logger.debug(_scope, 'called', { verificationId });
1090
1091 let result;
1092 try {
1093 result = this.statement.verificationDone.run({ verificationId });
1094 if (result.changes != 1) {
1095 throw new DBErrors.UnexpectedResult('did not release verification');
1096 }
1097 } catch (e) {
1098 this.logger.error(_scope, 'failed', { error: e, verificationId });
1099 throw e;
1100 }
1101 }
1102
1103
1104 verificationUpdate(dbCtx, verificationId, data) {
1105 const _scope = _fileScope('verificationUpdate');
1106 this.logger.debug(_scope, 'called', { verificationId, data });
1107
1108 const verificationData = {
1109 reason: null,
1110 verificationId,
1111 ...data,
1112 };
1113
1114 let result;
1115 try {
1116 this._verificationUpdateDataValidate(verificationData);
1117 DatabaseSQLite._verificationDataToEngine(verificationData);
1118 result = this.statement.verificationUpdate.run(verificationData);
1119 if (result.changes != 1) {
1120 throw new DBErrors.UnexpectedResult('did not update verification');
1121 }
1122 } catch (e) {
1123 this.logger.error(_scope, 'failed', { error: e, verificationData });
1124 throw e;
1125 }
1126 }
1127
1128
1129 verificationValidated(dbCtx, verificationId) {
1130 const _scope = _fileScope('verificationValidated');
1131 this.logger.debug(_scope, 'called', { verificationId });
1132
1133 let result;
1134 try {
1135 result = this.statement.verificationValidate.run({ verificationId });
1136 if (result.changes != 1) {
1137 throw new DBErrors.UnexpectedResult('did not set verification validation');
1138 }
1139 } catch (e) {
1140 this.logger.error(_scope, 'failed', { error: e, verificationId });
1141 throw e;
1142 }
1143 }
1144
1145
1146 }
1147
1148 module.exports = DatabaseSQLite;