database migration 1.0.4, store topic fetch etag/last-modified, provide these when...
[websub-hub] / src / db / sqlite / index.js
1 'use strict';
2
3 const common = require('../../common');
4 const Database = require('../base');
5 const DBErrors = require('../errors');
6 const svh = require('../schema-version-helper');
7 const SQLite = require('better-sqlite3');
8 const fs = require('fs');
9 const path = require('path');
10 const { performance } = require('perf_hooks');
11
12 const _fileScope = common.fileScope(__filename);
13
14 const schemaVersionsSupported = {
15 min: {
16 major: 1,
17 minor: 0,
18 patch: 0,
19 },
20 max: {
21 major: 1,
22 minor: 0,
23 patch: 4,
24 },
25 };
26
27 // max of signed int64 (2^63 - 1), should be enough
28 const EPOCH_FOREVER = BigInt('9223372036854775807');
29 const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
30 const dateToEpoch = (date) => Math.round(date.getTime() / 1000);
31
32 class DatabaseSQLite extends Database {
33 constructor(logger, options) {
34 super(logger, options);
35
36 const connectionString = options.db.connectionString || 'sqlite://:memory:';
37 const csDelim = '://';
38 const dbFilename = connectionString.slice(connectionString.indexOf(csDelim) + csDelim.length);
39
40 const queryLogLevel = options.db.queryLogLevel;
41
42 const sqliteOptions = {
43 ...(queryLogLevel && {
44 // eslint-disable-next-line security/detect-object-injection
45 verbose: (query) => this.logger[queryLogLevel](_fileScope('SQLite:verbose'), '', { query }),
46 }),
47 };
48 this.db = new SQLite(dbFilename, sqliteOptions);
49 this.schemaVersionsSupported = schemaVersionsSupported;
50 this.changesSinceLastOptimize = BigInt(0);
51 this.optimizeAfterChanges = options.db.connectionString.optimizeAfterChanges;
52 this.db.pragma('foreign_keys = on'); // Enforce consistency.
53 this.db.pragma('journal_mode = WAL'); // Be faster, expect local filesystem.
54 this.db.defaultSafeIntegers(true); // This probably isn't necessary, but by using these BigInts we keep weird floats out of the query logs.
55
56 this._initTables();
57 this._initStatements();
58 }
59
60
61 /**
62 * SQLite cannot prepare its statements without a schema, ensure such exists.
63 */
64 _initTables() {
65 const _scope = _fileScope('_initTables');
66
67 // Migrations rely upon this table, ensure it exists.
68 const metaVersionTable = '_meta_schema_version';
69 const tableExists = this.db.prepare('SELECT name FROM sqlite_master WHERE type=:type AND name=:name').pluck(true).bind({ type: 'table', name: metaVersionTable });
70 let metaExists = tableExists.get();
71 if (metaExists === undefined) {
72 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
73 // eslint-disable-next-line security/detect-non-literal-fs-filename
74 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
75 this.db.exec(fSql);
76 metaExists = tableExists.get();
77 /* istanbul ignore if */
78 if (metaExists === undefined) {
79 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
80 }
81 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
82 }
83
84 // Apply migrations
85 const currentSchema = this._currentSchema();
86 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
87 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
88 migrationsWanted.forEach((v) => {
89 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
90 try {
91 // eslint-disable-next-line security/detect-non-literal-fs-filename
92 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
93 this.logger.debug(_scope, 'applying migration', { version: v });
94 const results = this.db.exec(fSql);
95 this.logger.debug(_scope, 'migration results', { results });
96 this.logger.info(_scope, 'applied migration', { version: v });
97 } catch (e) {
98 this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
99 throw e;
100 }
101 });
102 }
103
104
105 _initStatements() {
106 const _scope = _fileScope('_initStatements');
107 const sqlDir = path.join(__dirname, 'sql');
108 this.statement = {};
109
110 // Decorate the statement calls we use with timing and logging.
111 const wrapFetch = (logName, statementName, fn) => {
112 const _wrapScope = _fileScope(logName);
113 return (...args) => {
114 const startTimestampMs = performance.now();
115 const rows = fn(...args);
116 DatabaseSQLite._deOphidiate(rows);
117 const elapsedTimeMs = performance.now() - startTimestampMs;
118 this.logger.debug(_wrapScope, 'complete', { statementName, elapsedTimeMs });
119 return rows;
120 };
121 };
122 const wrapRun = (logName, statementName, fn) => {
123 const _wrapScope = _fileScope(logName);
124 return (...args) => {
125 const startTimestampMs = performance.now();
126 const result = fn(...args);
127 const elapsedTimeMs = performance.now() - startTimestampMs;
128 this.logger.debug(_wrapScope, 'complete', { ...result, statementName, elapsedTimeMs });
129 result.duration = elapsedTimeMs;
130 return result;
131 };
132 };
133
134 // eslint-disable-next-line security/detect-non-literal-fs-filename
135 for (const f of fs.readdirSync(sqlDir)) {
136 const fPath = path.join(sqlDir, f);
137 const { name: fName, ext: fExt } = path.parse(f);
138 // eslint-disable-next-line security/detect-non-literal-fs-filename
139 const stat = fs.statSync(fPath);
140 if (!stat.isFile()
141 || fExt.toLowerCase() !== '.sql') {
142 continue;
143 }
144 // eslint-disable-next-line security/detect-non-literal-fs-filename
145 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
146 const statementName = Database._camelfy(fName.toLowerCase(), '-');
147 let statement;
148 try {
149 statement = this.db.prepare(fSql);
150 } catch (e) {
151 /* istanbul ignore next */
152 this.logger.error(_scope, 'failed to prepare statement', { error: e, file: f });
153 /* istanbul ignore next */
154 throw e;
155 }
156 // eslint-disable-next-line security/detect-object-injection
157 this.statement[statementName] = statement;
158 const { get: origGet, all: origAll, run: origRun } = statement;
159 statement.get = wrapFetch('SQLite:get', statementName, origGet.bind(statement));
160 statement.all = wrapFetch('SQLite:all', statementName, origAll.bind(statement));
161 statement.run = wrapRun('SQLite:run', statementName, origRun.bind(statement));
162 }
163 this.statement._optimize = this.db.prepare('SELECT * FROM pragma_optimize(0x03)');
164
165 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
166 }
167
168
169 static _deOphidiate(rows) {
170 const rowsIsArray = Array.isArray(rows);
171 if (!rowsIsArray) {
172 rows = [rows];
173 }
174 const exemplaryRow = rows[0];
175 for (const prop in exemplaryRow) {
176 const camel = Database._camelfy(prop);
177 if (!(camel in exemplaryRow)) {
178 for (const d of rows) {
179 // eslint-disable-next-line security/detect-object-injection
180 d[camel] = d[prop];
181 // eslint-disable-next-line security/detect-object-injection
182 delete d[prop];
183 }
184 }
185 }
186 return rowsIsArray ? rows : rows[0];
187 }
188
189
190 _currentSchema() {
191 return this.db.prepare('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1').get();
192 }
193
194
195 healthCheck() {
196 const _scope = _fileScope('healthCheck');
197 this.logger.debug(_scope, 'called', {});
198 if (!this.db.open) {
199 throw new DBErrors.UnexpectedResult('database is not open');
200 }
201 return { open: this.db.open };
202 }
203
204
205 _engineInfo(result) {
206 if (result.changes) {
207 this.changesSinceLastOptimize += BigInt(result.changes);
208 this._optimize();
209 }
210 return {
211 changes: Number(result.changes),
212 lastInsertRowid: result.lastInsertRowid,
213 };
214 }
215
216
217 _closeConnection() {
218 this.db.close();
219 }
220
221
222 _optimize() {
223 const _scope = _fileScope('_optimize');
224
225 if (this.optimizeAfterChanges
226 && this.changesSinceLastOptimize >= this.optimizeAfterChanges) {
227 const optimize = this.statement._optimize.all();
228 this.logger.debug(_scope, 'optimize', { optimize });
229 this.db.pragma('optimize');
230 this.changesSinceLastOptimize = BigInt(0);
231 }
232 }
233
234
235 _purgeTables(really) {
236 if (really) {
237 [
238 'topic',
239 'topic_fetch_in_progress',
240 'verification',
241 'verification_in_progress',
242 'subscription',
243 'subscription_delivery_in_progress',
244 ].map((table) => {
245 const result = this.db.prepare(`DELETE FROM ${table}`).run();
246 this.logger.debug(_fileScope('_purgeTables'), 'success', { table, result });
247 });
248 }
249 }
250
251
252 context(fn) {
253 return fn(this.db);
254 }
255
256
257 transaction(dbCtx, fn) {
258 dbCtx = dbCtx || this.db;
259 return dbCtx.transaction(fn)();
260 }
261
262
263 authenticationSuccess(dbCtx, identifier) {
264 const _scope = _fileScope('authenticationSuccess');
265 this.logger.debug(_scope, 'called', { identifier });
266
267 let result;
268 try {
269 result = this.statement.authenticationSuccess.run({ identifier });
270 if (result.changes != 1) {
271 throw new DBErrors.UnexpectedResult('did not update authentication success');
272 }
273 } catch (e) {
274 this.logger.error(_scope, 'failed', { error: e, identifier });
275 throw e;
276 }
277 }
278
279
280 authenticationGet(dbCtx, identifier) {
281 const _scope = _fileScope('authenticationGet');
282 this.logger.debug(_scope, 'called', { identifier });
283
284 try {
285 return this.statement.authenticationGet.get({ identifier });
286 } catch (e) {
287 this.logger.error(_scope, 'failed', { error: e, identifier });
288 throw e;
289 }
290 }
291
292
293 authenticationUpsert(dbCtx, identifier, credential) {
294 const _scope = _fileScope('authenticationUpsert');
295 const scrubbedCredential = '*'.repeat((credential || '').length);
296 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
297
298 let result;
299 try {
300 result = this.statement.authenticationUpsert.run({ identifier, credential });
301 if (result.changes != 1) {
302 throw new DBErrors.UnexpectedResult('did not upsert authentication');
303 }
304 } catch (e) {
305 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
306 throw e;
307 }
308 }
309
310
311 /**
312 * Converts engine subscription fields to native types.
313 * @param {Object} data
314 */
315 static _subscriptionDataToNative(data) {
316 if (data) {
317 ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
318 // eslint-disable-next-line security/detect-object-injection
319 data[field] = epochToDate(data[field]);
320 });
321 }
322 return data;
323 }
324
325
326 subscriptionsByTopicId(dbCtx, topicId) {
327 const _scope = _fileScope('subscriptionsByTopicId');
328 this.logger.debug(_scope, 'called', { topicId });
329
330 try {
331 const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
332 return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
333 } catch (e) {
334 this.logger.error(_scope, 'failed', { error: e, topicId });
335 throw e;
336 }
337 }
338
339
340 subscriptionCountByTopicUrl(dbCtx, topicUrl) {
341 const _scope = _fileScope('subscriptionCountByTopicUrl');
342 this.logger.debug(_scope, 'called', { topicUrl });
343
344 try {
345 return this.statement.subscriptionCountByTopicUrl.get({ topicUrl });
346 } catch (e) {
347 this.logger.error(_scope, 'failed', { error: e, topicUrl });
348 throw e;
349 }
350 }
351
352
353 subscriptionDelete(dbCtx, callback, topicId) {
354 const _scope = _fileScope('subscriptionDelete');
355 this.logger.debug(_scope, 'called', { callback, topicId });
356
357 try {
358 const result = this.statement.subscriptionDelete.run({ callback, topicId });
359 if (result.changes != 1) {
360 throw new DBErrors.UnexpectedResult('did not delete subscription');
361 }
362 return this._engineInfo(result);
363 } catch (e) {
364 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
365 throw e;
366 }
367 }
368
369
370 subscriptionDeleteExpired(dbCtx, topicId) {
371 const _scope = _fileScope('subscriptionDeleteExpired');
372 this.logger.debug(_scope, 'called', { topicId });
373
374 try {
375 const result = this.statement.subscriptionDeleteExpired.run({ topicId });
376 this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
377 return this._engineInfo(result);
378 } catch (e) {
379 this.logger.error(_scope, 'failed', { error: e, topicId });
380 throw e;
381 }
382 }
383
384
385 subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
386 const _scope = _fileScope('subscriptionDeliveryClaim');
387 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
388
389 let subscriptionIds;
390 try {
391 this.db.transaction(() => {
392 subscriptionIds = this.statement.subscriptionDeliveryNeeded.all({ wanted }).map((claim) => claim.id);
393 subscriptionIds.forEach((subscriptionId) => {
394 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
395 if (result.changes != 1) {
396 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
397 }
398 });
399 })();
400 return subscriptionIds;
401 } catch (e) {
402 this.logger.error(_scope, 'failed', { error: e, wanted, claimTimeoutSeconds, claimant, subscriptionIds });
403 throw e;
404 }
405 }
406
407
408 subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
409 const _scope = _fileScope('subscriptionDeliveryClaimById');
410 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
411
412 try {
413 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
414 if (result.changes != 1) {
415 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
416 }
417 return this._engineInfo(result);
418 } catch (e) {
419 this.logger.error(_scope, 'failed', { error: e, subscriptionId, claimTimeoutSeconds, claimant });
420 throw e;
421 }
422 }
423
424
425 subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
426 const _scope = _fileScope('subscriptionDeliveryComplete');
427 this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
428
429 let result;
430 try {
431 this.db.transaction(() => {
432 topicContentUpdated = dateToEpoch(topicContentUpdated);
433 result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId, topicContentUpdated });
434 if (result.changes != 1) {
435 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
436 }
437 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
438 if (result.changes != 1) {
439 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
440 }
441 })();
442 return this._engineInfo(result);
443 } catch (e) {
444 this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
445 throw e;
446 }
447 }
448
449
450 subscriptionDeliveryGone(dbCtx, callback, topicId) {
451 const _scope = _fileScope('subscriptionDeliveryGone');
452 this.logger.debug(_scope, 'called', { callback, topicId });
453
454 let result;
455 try {
456 this.db.transaction(() => {
457 result = this.statement.subscriptionDelete.run({ callback, topicId });
458 if (result.changes != 1) {
459 throw new DBErrors.UnexpectedResult('did not delete subscription');
460 }
461 // Delete cascades to delivery
462 // result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
463 // if (result.changes != 1) {
464 // throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
465 // }
466 })();
467 return this._engineInfo(result);
468 } catch (e) {
469 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
470 throw e;
471 }
472 }
473
474
475 subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
476 const _scope = _fileScope('subscriptionDeliveryIncomplete');
477 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
478
479 let result;
480 try {
481 this.db.transaction(() => {
482 const { currentAttempt } = this.statement.subscriptionDeliveryAttempts.get({ callback, topicId });
483 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
484 result = this.statement.subscriptionDeliveryFailure.run({ nextAttemptDelaySeconds, callback, topicId });
485 if (result.changes != 1) {
486 throw new DBErrors.UnexpectedResult('did not set delivery failure');
487 }
488 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
489 if (result.changes != 1) {
490 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
491 }
492 })();
493 } catch (e) {
494 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
495 throw e;
496 }
497 }
498
499
500 subscriptionGet(dbCtx, callback, topicId) {
501 const _scope = _fileScope('subscriptionGet');
502 this.logger.debug(_scope, 'called', { callback, topicId });
503
504 let subscription;
505 try {
506 subscription = this.statement.subscriptionGet.get({ callback, topicId });
507 return DatabaseSQLite._subscriptionDataToNative(subscription);
508 } catch (e) {
509 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
510 throw e;
511 }
512 }
513
514
515 subscriptionGetById(dbCtx, subscriptionId) {
516 const _scope = _fileScope('subscriptionGetById');
517 this.logger.debug(_scope, 'called', { subscriptionId });
518
519 let subscription;
520 try {
521 subscription = this.statement.subscriptionGetById.get({ subscriptionId });
522 return DatabaseSQLite._subscriptionDataToNative(subscription);
523 } catch (e) {
524 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
525 throw e;
526 }
527 }
528
529
530 subscriptionUpdate(dbCtx, data) {
531 const _scope = _fileScope('subscriptionUpdate');
532 this.logger.debug(_scope, 'called', { data });
533
534 const subscriptionData = {
535 ...data,
536 };
537
538 this._subscriptionUpdateDataValidate(subscriptionData);
539
540 try {
541 const result = this.statement.subscriptionUpdate.run(subscriptionData);
542 if (result.changes != 1) {
543 throw new DBErrors.UnexpectedResult('did not update subscription');
544 }
545 } catch (e) {
546 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
547 throw e;
548 }
549 }
550
551
552 subscriptionUpsert(dbCtx, data) {
553 const _scope = _fileScope('subscriptionUpsert');
554 this.logger.debug(_scope, 'called', { ...data });
555
556 const subscriptionData = {
557 secret: null,
558 httpRemoteAddr: null,
559 httpFrom: null,
560 ...data,
561 }
562 this._subscriptionUpsertDataValidate(subscriptionData);
563
564 let result;
565 try {
566 result = this.statement.subscriptionUpsert.run(subscriptionData);
567 if (result.changes != 1) {
568 throw new DBErrors.UnexpectedResult('did not upsert subscription');
569 }
570 return this._engineInfo(result);
571 } catch (e) {
572 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
573 throw e;
574 }
575 }
576
577
578 topicDeleted(dbCtx, topicId) {
579 const _scope = _fileScope('topicDeleted');
580 this.logger.debug(_scope, 'called', { topicId });
581
582 let result;
583 try {
584 result = this.statement.topicDeleted.run({ topicId });
585 if (result.changes != 1) {
586 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
587 }
588 } catch (e) {
589 this.logger.error(_scope, 'failed', { error: e, topicId });
590 throw e;
591 }
592 }
593
594
595 topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
596 const _scope = _fileScope('topicFetchClaim');
597 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
598
599 let topicIds;
600 try {
601 this.db.transaction(() => {
602 topicIds = this.statement.topicContentFetchNeeded.all({ wanted }).map((claim) => claim.id);
603 topicIds.forEach((topicId) => {
604 const result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
605 if (result.changes != 1) {
606 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
607 }
608 });
609 })();
610 return topicIds;
611 } catch (e) {
612 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, wanted, claimTimeoutSeconds, claimant, topicIds });
613 throw e;
614 }
615 }
616
617
618 topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
619 const _scope = _fileScope('topicFetchClaimById');
620 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
621
622 let result;
623 try {
624 result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
625 if (result.changes != 1) {
626 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
627 }
628 return this._engineInfo(result);
629 } catch (e) {
630 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, topicId, claimTimeoutSeconds, claimant });
631 throw e;
632 }
633 }
634
635
636 topicFetchComplete(dbCtx, topicId) {
637 const _scope = _fileScope('topicFetchComplete');
638 this.logger.debug(_scope, 'called', { topicId });
639
640 let result;
641 try {
642 this.db.transaction(() => {
643 result = this.statement.topicAttemptsReset.run({ topicId, forever: EPOCH_FOREVER });
644 if (result.changes != 1) {
645 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
646 }
647 result = this.statement.topicContentFetchDone.run({ topicId });
648 if (result.changes != 1) {
649 throw new DBErrors.UnexpectedResult('did not release topic fetch');
650 }
651 })();
652 return this._engineInfo(result);
653 } catch (e) {
654 this.logger.error(_scope, 'failed', { error: e, result, topicId });
655 throw e;
656 }
657 }
658
659
660 topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
661 const _scope = _fileScope('topicFetchIncomplete');
662 this.logger.debug(_scope, 'called', { topicId });
663
664 let result;
665 try {
666 this.db.transaction(() => {
667 const { contentFetchAttemptsSinceSuccess: currentAttempt } = this.statement.topicAttempts.get({ topicId });
668 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
669 result = this.statement.topicAttemptsIncrement.run({ topicId, nextAttemptDelaySeconds });
670 if (result.changes != 1) {
671 throw new DBErrors.UnexpectedResult('did not set topic attempts');
672 }
673 result = this.statement.topicContentFetchDone.run({ topicId });
674 if (result.changes != 1) {
675 throw new DBErrors.UnexpectedResult('did not release topic fetch');
676 }
677 return result;
678 })();
679 return this._engineInfo(result);
680 } catch (e) {
681 this.logger.error(_scope, 'failed', { error: e, result, topicId });
682 throw e;
683 }
684 }
685
686
687 topicFetchRequested(dbCtx, topicId) {
688 const _scope = _fileScope('topicFetchRequested');
689 this.logger.debug(_scope, 'called', { topicId });
690
691 let result;
692 try {
693 result = this.statement.topicContentFetchRequested.run({ topicId });
694 if (result.changes != 1) {
695 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
696 }
697 return this._engineInfo(result);
698 } catch (e) {
699 this.logger.error(_scope, 'failed', { error: e, topicId });
700 throw e;
701 }
702 }
703
704
705 /**
706 * Converts engine topic fields to native types.
707 * @param {Object} data
708 */
709 static _topicDataToNative(data) {
710 if (data) {
711 data.isActive = !!data.isActive;
712 data.isDeleted = !!data.isDeleted;
713 ['created', 'lastPublish', 'contentFetchNextAttempt', 'contentUpdated'].forEach((field) => {
714 // eslint-disable-next-line security/detect-object-injection
715 data[field] = epochToDate(data[field]);
716 });
717 }
718 return data;
719 }
720
721
722 // eslint-disable-next-line no-unused-vars
723 topicGetAll(dbCtx) {
724 const _scope = _fileScope('topicGetAll');
725 this.logger.debug(_scope, 'called');
726
727 let topics;
728 try {
729 topics = this.statement.topicGetInfoAll.all();
730 } catch (e) {
731 this.logger.error(_scope, 'failed', { error: e, topics });
732 throw e;
733 }
734 if (topics) {
735 topics = topics
736 .map(DatabaseSQLite._topicDataToNative)
737 .map(this._topicDefaults.bind(this));
738 }
739 return topics;
740 }
741
742
743 topicGetById(dbCtx, topicId, applyDefaults = true) {
744 const _scope = _fileScope('topicGetById');
745 this.logger.debug(_scope, 'called', { topicId });
746
747 let topic;
748 try {
749 topic = this.statement.topicGetById.get({ topicId });
750 DatabaseSQLite._topicDataToNative(topic);
751 if (applyDefaults) {
752 topic = this._topicDefaults(topic);
753 }
754 return topic;
755 } catch (e) {
756 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
757 throw e;
758 }
759 }
760
761
762 topicGetByUrl(dbCtx, topicUrl) {
763 const _scope = _fileScope('topicGetByUrl');
764 this.logger.debug(_scope, 'called', { topicUrl });
765
766 let topic;
767 try {
768 topic = this.statement.topicGetByUrl.get({ topicUrl });
769 DatabaseSQLite._topicDataToNative(topic);
770 return this._topicDefaults(topic);
771 } catch (e) {
772 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
773 throw e;
774 }
775 }
776
777
778 topicGetContentById(dbCtx, topicId) {
779 const _scope = _fileScope('topicGetContentById');
780 this.logger.debug(_scope, 'called', { topicId });
781
782 let topic;
783 try {
784 topic = this.statement.topicGetContentById.get({ topicId });
785 DatabaseSQLite._topicDataToNative(topic);
786 return this._topicDefaults(topic);
787 } catch (e) {
788 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
789 throw e;
790 }
791 }
792
793
794 topicPendingDelete(dbCtx, topicId) {
795 const _scope = _fileScope('topicPendingDelete');
796 this.logger.debug(_scope, 'called', { topicId });
797
798 try {
799 this.db.transaction(() => {
800 const topic = this.statement.topicGetById.get({ topicId });
801 if (!topic.isDeleted) {
802 this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
803 return;
804 }
805
806 const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
807 if (subscriberCount) {
808 this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
809 return;
810 }
811
812 const result = this.statement.topicDeleteById.run({ topicId });
813 if (result.changes !== 1) {
814 throw new DBErrors.UnexpectedResult('did not delete topic');
815 }
816 })();
817 this.logger.debug(_scope, 'success', { topicId });
818 } catch (e) {
819 this.logger.error(_scope, 'failed', { error: e, topicId });
820 throw e;
821 }
822 }
823
824
825 topicPublishHistory(dbCtx, topicId, days) {
826 const _scope = _fileScope('topicPublishHistory');
827 this.logger.debug(_scope, 'called', { topicId, days })
828
829 const events = this.statement.topicPublishHistory.all({ topicId, daysAgo: days });
830 const history = Array.from({ length: days }, () => 0);
831 events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
832
833 return history;
834 }
835
836
837 topicSet(dbCtx, data) {
838 const _scope = _fileScope('topicSet');
839 this.logger.debug(_scope, 'called', data);
840
841 const topicSetData = {
842 publisherValidationUrl: null,
843 leaseSecondsPreferred: null,
844 leaseSecondsMin: null,
845 leaseSecondsMax: null,
846 ...data,
847 };
848
849 let result;
850 try {
851 this._topicSetDataValidate(topicSetData);
852 result = this.statement.topicUpsert.run(topicSetData);
853 if (result.changes != 1) {
854 throw new DBErrors.UnexpectedResult('did not set topic data');
855 }
856 return this._engineInfo(result);
857 } catch (e) {
858 this.logger.error(_scope, 'failed', { error: e, result });
859 throw e;
860 }
861 }
862
863
864 topicSetContent(dbCtx, data) {
865 const _scope = _fileScope('topicSetContent');
866 const topicSetContentData = {
867 contentType: null,
868 httpETag: null,
869 httpLastModified: null,
870 ...data,
871 };
872 const logData = {
873 ...topicSetContentData,
874 content: common.logTruncate(topicSetContentData.content, 100),
875 };
876 this.logger.debug(_scope, 'called', logData);
877
878 let result;
879 try {
880 this._topicSetContentDataValidate(topicSetContentData);
881 result = this.statement.topicSetContent.run(topicSetContentData);
882 logData.result = result;
883 if (result.changes != 1) {
884 throw new DBErrors.UnexpectedResult('did not set topic content');
885 }
886 result = this.statement.topicSetContentHistory.run({
887 topicId: data.topicId,
888 contentHash: data.contentHash,
889 contentSize: data.content.length,
890 });
891 if (result.changes != 1) {
892 throw new DBErrors.UnexpectedResult('did not set topic content history');
893 }
894 return this._engineInfo(result);
895 } catch (e) {
896 this.logger.error(_scope, 'failed', { error: e, ...logData });
897 throw e;
898 }
899 }
900
901
902 topicUpdate(dbCtx, data) {
903 const _scope = _fileScope('topicUpdate');
904 this.logger.debug(_scope, 'called', { data });
905
906 const topicData = {
907 leaseSecondsPreferred: null,
908 leaseSecondsMin: null,
909 leaseSecondsMax: null,
910 publisherValidationUrl: null,
911 ...data,
912 };
913
914 this._topicUpdateDataValidate(topicData);
915
916 try {
917 const result = this.statement.topicUpdate.run(topicData);
918 if (result.changes != 1) {
919 throw new DBErrors.UnexpectedResult('did not update topic');
920 }
921 } catch (e) {
922 this.logger.error(_scope, 'failed', { error: e, topicData });
923 throw e;
924 }
925 }
926
927
928 verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
929 const _scope = _fileScope('verificationClaim');
930 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
931
932 let verificationIds;
933 try {
934 this.db.transaction(() => {
935 verificationIds = this.statement.verificationNeeded.all({ wanted }).map((claim) => claim.id);
936 verificationIds.forEach((verificationId) => {
937 const result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
938 if (result.changes != 1) {
939 throw new DBErrors.UnexpectedResult('did not claim verification');
940 }
941 });
942 })();
943 return verificationIds;
944 } catch (e) {
945 this.logger.error(_scope, 'failed to claim verifications', { wanted, claimTimeoutSeconds });
946 throw e;
947 }
948 }
949
950
951 verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
952 const _scope = _fileScope('verificationClaimById');
953 this.logger.debug(_scope, 'called', { verificationId, claimTimeoutSeconds, claimant });
954
955 let result;
956 try {
957 result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
958 if (result.changes != 1) {
959 throw new DBErrors.UnexpectedResult('did not claim verification');
960 }
961 return this._engineInfo(result);
962 } catch (e) {
963 this.logger.error(_scope, 'failed to claim verification', { error: e, verificationId, claimTimeoutSeconds, claimant });
964 throw e;
965 }
966 }
967
968
969 verificationComplete(dbCtx, verificationId, callback, topicId) {
970 const _scope = _fileScope('verificationComplete');
971 this.logger.debug(_scope, 'called', { verificationId });
972
973 let result;
974 try {
975 this.db.transaction(() => {
976 result = this.statement.verificationScrub.run({ verificationId, callback, topicId });
977 if (result.changes < 1) {
978 throw new DBErrors.UnexpectedResult('did not remove verifications');
979 }
980 })();
981 } catch (e) {
982 this.logger.error(_scope, 'failed', { verificationId });
983 throw e;
984 }
985 return this._engineInfo(result);
986 }
987
988
989 /**
990 * Converts engine verification fields to native types.
991 * @param {Object} data
992 */
993 static _verificationDataToNative(data) {
994 if (data) {
995 data.isPublisherValidated = !!data.isPublisherValidated;
996 }
997 }
998
999
1000 verificationGetById(dbCtx, verificationId) {
1001 const _scope = _fileScope('verificationGetById');
1002 this.logger.debug(_scope, 'called', { verificationId });
1003
1004 let verification;
1005 try {
1006 verification = this.statement.verificationGetById.get({ verificationId });
1007 DatabaseSQLite._verificationDataToNative(verification);
1008 return verification;
1009 } catch (e) {
1010 this.logger.error(_scope, 'failed', { error: e, verificationId });
1011 throw e;
1012 }
1013 }
1014
1015
1016 verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
1017 const _scope = _fileScope('verificationIncomplete');
1018 this.logger.debug(_scope, 'called', { verificationId });
1019
1020 let result;
1021 try {
1022 this.db.transaction(() => {
1023 const { attempts: currentAttempt } = this.statement.verificationAttempts.get({ verificationId });
1024 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
1025 result = this.statement.verificationAttemptsIncrement.run({ verificationId, nextAttemptDelaySeconds });
1026 if (result.changes != 1) {
1027 throw new DBErrors.UnexpectedResult('did not increment verification attempts');
1028 }
1029 result = this.statement.verificationDone.run({ verificationId });
1030 if (result.changes != 1) {
1031 throw new DBErrors.UnexpectedResult('did not release verification in progress');
1032 }
1033 return result;
1034 })();
1035 return this._engineInfo(result);
1036 } catch (e) {
1037 this.logger.error(_scope, 'failed', { error: e, result, verificationId });
1038 throw e;
1039 }
1040 }
1041
1042
1043 /**
1044 * Convert native verification fields to engine types.
1045 */
1046 static _verificationDataToEngine(data) {
1047 if (data) {
1048 data.isPublisherValidated = data.isPublisherValidated ? 1 : 0;
1049 }
1050 }
1051
1052
1053 verificationInsert(dbCtx, verification) {
1054 const _scope = _fileScope('verificationInsert');
1055 this.logger.debug(_scope, 'called', { verification });
1056
1057 const verificationData = {
1058 secret: null,
1059 httpRemoteAddr: null,
1060 httpFrom: null,
1061 requestId: null,
1062 ...verification,
1063 };
1064
1065 let result, verificationId;
1066 try {
1067 this._verificationDataValidate(verificationData);
1068 DatabaseSQLite._verificationDataToEngine(verificationData);
1069 result = this.statement.verificationInsert.run(verificationData);
1070 if (result.changes != 1) {
1071 throw new DBErrors.UnexpectedResult('did not insert verification');
1072 }
1073 verificationId = result.lastInsertRowid;
1074 this.logger.debug(_scope, 'inserted verification', { verificationId });
1075
1076 return verificationId;
1077 } catch (e) {
1078 this.logger.error(_scope, 'failed', { error: e, verificationData });
1079 throw e;
1080 }
1081 }
1082
1083
1084 verificationRelease(dbCtx, verificationId) {
1085 const _scope = _fileScope('verificationRelease');
1086 this.logger.debug(_scope, 'called', { verificationId });
1087
1088 let result;
1089 try {
1090 result = this.statement.verificationDone.run({ verificationId });
1091 if (result.changes != 1) {
1092 throw new DBErrors.UnexpectedResult('did not release verification');
1093 }
1094 } catch (e) {
1095 this.logger.error(_scope, 'failed', { error: e, verificationId });
1096 throw e;
1097 }
1098 }
1099
1100
1101 verificationUpdate(dbCtx, verificationId, data) {
1102 const _scope = _fileScope('verificationUpdate');
1103 this.logger.debug(_scope, 'called', { verificationId, data });
1104
1105 const verificationData = {
1106 reason: null,
1107 verificationId,
1108 ...data,
1109 };
1110
1111 let result;
1112 try {
1113 this._verificationUpdateDataValidate(verificationData);
1114 DatabaseSQLite._verificationDataToEngine(verificationData);
1115 result = this.statement.verificationUpdate.run(verificationData);
1116 if (result.changes != 1) {
1117 throw new DBErrors.UnexpectedResult('did not update verification');
1118 }
1119 } catch (e) {
1120 this.logger.error(_scope, 'failed', { error: e, verificationData });
1121 throw e;
1122 }
1123 }
1124
1125
1126 verificationValidated(dbCtx, verificationId) {
1127 const _scope = _fileScope('verificationValidated');
1128 this.logger.debug(_scope, 'called', { verificationId });
1129
1130 let result;
1131 try {
1132 result = this.statement.verificationValidate.run({ verificationId });
1133 if (result.changes != 1) {
1134 throw new DBErrors.UnexpectedResult('did not set verification validation');
1135 }
1136 } catch (e) {
1137 this.logger.error(_scope, 'failed', { error: e, verificationId });
1138 throw e;
1139 }
1140 }
1141
1142
1143 }
1144
1145 module.exports = DatabaseSQLite;