Initial release
[websub-hub] / src / db / sqlite / index.js
1 'use strict';
2
3 const common = require('../../common');
4 const Database = require('../base');
5 const DBErrors = require('../errors');
6 const svh = require('../schema-version-helper');
7 const SQLite = require('better-sqlite3');
8 const fs = require('fs');
9 const path = require('path');
10 const { performance } = require('perf_hooks');
11
12 const _fileScope = common.fileScope(__filename);
13
14 const schemaVersionsSupported = {
15 min: {
16 major: 1,
17 minor: 0,
18 patch: 0,
19 },
20 max: {
21 major: 1,
22 minor: 0,
23 patch: 0,
24 },
25 };
26
27 // max of signed int64 (2^63 - 1), should be enough
28 const EPOCH_FOREVER = BigInt('9223372036854775807');
29
30 class DatabaseSQLite extends Database {
31 constructor(logger, options) {
32 super(logger, options);
33
34 const connectionString = options.db.connectionString || 'sqlite://:memory:';
35 const csDelim = '://';
36 const dbFilename = connectionString.slice(connectionString.indexOf(csDelim) + csDelim.length);
37
38 const queryLogLevel = options.db.queryLogLevel;
39
40 const sqliteOptions = {
41 ...(queryLogLevel && {
42 // eslint-disable-next-line security/detect-object-injection
43 verbose: (query) => this.logger[queryLogLevel](_fileScope('SQLite:verbose'), '', { query }),
44 }),
45 };
46 this.db = new SQLite(dbFilename, sqliteOptions);
47 this.schemaVersionsSupported = schemaVersionsSupported;
48 this.changesSinceLastOptimize = BigInt(0);
49 this.optimizeAfterChanges = options.db.connectionString.optimizeAfterChanges;
50 this.db.pragma('foreign_keys = on'); // Enforce consistency.
51 this.db.pragma('journal_mode = WAL'); // Be faster, expect local filesystem.
52 this.db.defaultSafeIntegers(true); // This probably isn't necessary, but by using these BigInts we keep weird floats out of the query logs.
53
54 this._initTables();
55 this._initStatements();
56 }
57
58
59 /**
60 * SQLite cannot prepare its statements without a schema, ensure such exists.
61 */
62 _initTables() {
63 const _scope = _fileScope('_initTables');
64
65 // Migrations rely upon this table, ensure it exists.
66 const metaVersionTable = '_meta_schema_version';
67 const tableExists = this.db.prepare('SELECT name FROM sqlite_master WHERE type=:type AND name=:name').pluck(true).bind({ type: 'table', name: metaVersionTable });
68 let metaExists = tableExists.get();
69 if (metaExists === undefined) {
70 const fPath = path.join(__dirname, 'sql', 'schema', 'init.sql');
71 // eslint-disable-next-line security/detect-non-literal-fs-filename
72 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
73 this.db.exec(fSql);
74 metaExists = tableExists.get();
75 /* istanbul ignore if */
76 if (metaExists === undefined) {
77 throw new DBErrors.UnexpectedResult(`did not create ${metaVersionTable} table`);
78 }
79 this.logger.info(_scope, 'created schema version table', { metaVersionTable });
80 }
81
82 // Apply migrations
83 const currentSchema = this._currentSchema();
84 const migrationsWanted = svh.unappliedSchemaVersions(__dirname, currentSchema, this.schemaVersionsSupported);
85 this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
86 migrationsWanted.forEach((v) => {
87 const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
88 // eslint-disable-next-line security/detect-non-literal-fs-filename
89 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
90 this.logger.info(_scope, 'applying migration', { version: v });
91 this.db.exec(fSql);
92 });
93 }
94
95
96 _initStatements() {
97 const _scope = _fileScope('_initStatements');
98 const sqlDir = path.join(__dirname, 'sql');
99 this.statement = {};
100
101 // Decorate the statement calls we use with timing and logging.
102 const wrapFetch = (logName, statementName, fn) => {
103 const _wrapScope = _fileScope(logName);
104 return (...args) => {
105 const startTimestampMs = performance.now();
106 const rows = fn(...args);
107 DatabaseSQLite._deOphidiate(rows);
108 const elapsedTimeMs = performance.now() - startTimestampMs;
109 this.logger.debug(_wrapScope, 'complete', { statementName, elapsedTimeMs });
110 return rows;
111 };
112 };
113 const wrapRun = (logName, statementName, fn) => {
114 const _wrapScope = _fileScope(logName);
115 return (...args) => {
116 const startTimestampMs = performance.now();
117 const result = fn(...args);
118 const elapsedTimeMs = performance.now() - startTimestampMs;
119 this.logger.debug(_wrapScope, 'complete', { ...result, statementName, elapsedTimeMs });
120 result.duration = elapsedTimeMs;
121 return result;
122 };
123 };
124
125 // eslint-disable-next-line security/detect-non-literal-fs-filename
126 for (const f of fs.readdirSync(sqlDir)) {
127 const fPath = path.join(sqlDir, f);
128 const { name: fName, ext: fExt } = path.parse(f);
129 // eslint-disable-next-line security/detect-non-literal-fs-filename
130 const stat = fs.statSync(fPath);
131 if (!stat.isFile()
132 || fExt.toLowerCase() !== '.sql') {
133 continue;
134 }
135 // eslint-disable-next-line security/detect-non-literal-fs-filename
136 const fSql = fs.readFileSync(fPath, { encoding: 'utf8' });
137 const statementName = Database._camelfy(fName.toLowerCase(), '-');
138 let statement;
139 try {
140 statement = this.db.prepare(fSql);
141 } catch (e) {
142 /* istanbul ignore next */
143 this.logger.error(_scope, 'failed to prepare statement', { error: e, file: f });
144 /* istanbul ignore next */
145 throw e;
146 }
147 // eslint-disable-next-line security/detect-object-injection
148 this.statement[statementName] = statement;
149 const { get: origGet, all: origAll, run: origRun } = statement;
150 statement.get = wrapFetch('SQLite:get', statementName, origGet.bind(statement));
151 statement.all = wrapFetch('SQLite:all', statementName, origAll.bind(statement));
152 statement.run = wrapRun('SQLite:run', statementName, origRun.bind(statement));
153 }
154 this.statement._optimize = this.db.prepare('SELECT * FROM pragma_optimize(0x03)');
155
156 this.logger.debug(_scope, 'statements initialized', { statements: Object.keys(this.statement).length });
157 }
158
159
160 static _deOphidiate(rows) {
161 const rowsIsArray = Array.isArray(rows);
162 if (!rowsIsArray) {
163 rows = [rows];
164 }
165 const exemplaryRow = rows[0];
166 for (const prop in exemplaryRow) {
167 const camel = Database._camelfy(prop);
168 if (!(camel in exemplaryRow)) {
169 for (const d of rows) {
170 // eslint-disable-next-line security/detect-object-injection
171 d[camel] = d[prop];
172 // eslint-disable-next-line security/detect-object-injection
173 delete d[prop];
174 }
175 }
176 }
177 return rowsIsArray ? rows : rows[0];
178 }
179
180
181 _currentSchema() {
182 return this.db.prepare('SELECT major, minor, patch FROM _meta_schema_version ORDER BY major DESC, minor DESC, patch DESC LIMIT 1').get();
183 }
184
185
186 healthCheck() {
187 const _scope = _fileScope('healthCheck');
188 this.logger.debug(_scope, 'called', {});
189 if (!this.db.open) {
190 throw new DBErrors.UnexpectedResult('database is not open');
191 }
192 return { open: this.db.open };
193 }
194
195
196 _engineInfo(result) {
197 if (result.changes) {
198 this.changesSinceLastOptimize += BigInt(result.changes);
199 this._optimize();
200 }
201 return {
202 changes: Number(result.changes),
203 lastInsertRowid: result.lastInsertRowid,
204 };
205 }
206
207
208 _closeConnection() {
209 this.db.close();
210 }
211
212
213 _optimize() {
214 const _scope = _fileScope('_optimize');
215
216 if (this.optimizeAfterChanges
217 && this.changesSinceLastOptimize >= this.optimizeAfterChanges) {
218 const optimize = this.statement._optimize.all();
219 this.logger.debug(_scope, 'optimize', { optimize });
220 this.db.pragma('optimize');
221 this.changesSinceLastOptimize = BigInt(0);
222 }
223 }
224
225
226 _purgeTables(really) {
227 if (really) {
228 [
229 'topic',
230 'topic_fetch_in_progress',
231 'verification',
232 'verification_in_progress',
233 'subscription',
234 'subscription_delivery_in_progress',
235 ].map((table) => {
236 const result = this.db.prepare(`DELETE FROM ${table}`).run();
237 this.logger.debug(_fileScope('_purgeTables'), 'success', { table, result });
238 });
239 }
240 }
241
242
243 context(fn) {
244 return fn(this.db);
245 }
246
247
248 transaction(dbCtx, fn) {
249 dbCtx = dbCtx || this.db;
250 return dbCtx.transaction(fn)();
251 }
252
253
254 authenticationSuccess(dbCtx, identifier) {
255 const _scope = _fileScope('authenticationSuccess');
256 this.logger.debug(_scope, 'called', { identifier });
257
258 let result;
259 try {
260 result = this.statement.authenticationSuccess.run({ identifier });
261 if (result.changes != 1) {
262 throw new DBErrors.UnexpectedResult('did not update authentication success');
263 }
264 } catch (e) {
265 this.logger.error(_scope, 'failed', { error: e, identifier });
266 throw e;
267 }
268 }
269
270
271 authenticationGet(dbCtx, identifier) {
272 const _scope = _fileScope('authenticationGet');
273 this.logger.debug(_scope, 'called', { identifier });
274
275 try {
276 return this.statement.authenticationGet.get({ identifier });
277 } catch (e) {
278 this.logger.error(_scope, 'failed', { error: e, identifier });
279 throw e;
280 }
281 }
282
283
284 authenticationUpsert(dbCtx, identifier, credential) {
285 const _scope = _fileScope('authenticationUpsert');
286 const scrubbedCredential = '*'.repeat((credential || '').length);
287 this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
288
289 let result;
290 try {
291 result = this.statement.authenticationUpsert.run({ identifier, credential });
292 if (result.changes != 1) {
293 throw new DBErrors.UnexpectedResult('did not upsert authentication');
294 }
295 } catch (e) {
296 this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
297 throw e;
298 }
299 }
300
301
302 subscriptionsByTopicId(dbCtx, topicId) {
303 const _scope = _fileScope('subscriptionsByTopicId');
304 this.logger.debug(_scope, 'called', { topicId });
305
306 try {
307 return this.statement.subscriptionsByTopicId.all({ topicId });
308 } catch (e) {
309 this.logger.error(_scope, 'failed', { error: e, topicId });
310 throw e;
311 }
312 }
313
314
315 subscriptionCountByTopicUrl(dbCtx, topicUrl) {
316 const _scope = _fileScope('subscriptionCountByTopicUrl');
317 this.logger.debug(_scope, 'called', { topicUrl });
318
319 try {
320 return this.statement.subscriptionCountByTopicUrl.get({ topicUrl });
321 } catch (e) {
322 this.logger.error(_scope, 'failed', { error: e, topicUrl });
323 throw e;
324 }
325 }
326
327
328 subscriptionDelete(dbCtx, callback, topicId) {
329 const _scope = _fileScope('subscriptionDelete');
330 this.logger.debug(_scope, 'called', { callback, topicId });
331
332 try {
333 const result = this.statement.subscriptionDelete.run({ callback, topicId });
334 if (result.changes != 1) {
335 throw new DBErrors.UnexpectedResult('did not delete subscription');
336 }
337 return this._engineInfo(result);
338 } catch (e) {
339 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
340 throw e;
341 }
342 }
343
344
345 subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
346 const _scope = _fileScope('subscriptionDeliveryClaim');
347 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
348
349 let subscriptionIds;
350 try {
351 this.db.transaction(() => {
352 subscriptionIds = this.statement.subscriptionDeliveryNeeded.all({ wanted }).map((claim) => claim.id);
353 subscriptionIds.forEach((subscriptionId) => {
354 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
355 if (result.changes != 1) {
356 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
357 }
358 });
359 })();
360 return subscriptionIds;
361 } catch (e) {
362 this.logger.error(_scope, 'failed', { error: e, wanted, claimTimeoutSeconds, claimant, subscriptionIds });
363 throw e;
364 }
365 }
366
367
368 subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
369 const _scope = _fileScope('subscriptionDeliveryClaimById');
370 this.logger.debug(_scope, 'called', { subscriptionId, claimTimeoutSeconds, claimant });
371
372 try {
373 const result = this.statement.subscriptionDeliveryClaimById.run({ subscriptionId, claimTimeoutSeconds, claimant });
374 if (result.changes != 1) {
375 throw new DBErrors.UnexpectedResult('did not claim subscription delivery');
376 }
377 return this._engineInfo(result);
378 } catch (e) {
379 this.logger.error(_scope, 'failed', { error: e, subscriptionId, claimTimeoutSeconds, claimant });
380 throw e;
381 }
382 }
383
384
385 subscriptionDeliveryComplete(dbCtx, callback, topicId) {
386 const _scope = _fileScope('subscriptionDeliveryComplete');
387 this.logger.debug(_scope, 'called', { callback, topicId });
388
389 let result;
390 try {
391 this.db.transaction(() => {
392 result = this.statement.subscriptionDeliverySuccess.run({ callback, topicId });
393 if (result.changes != 1) {
394 throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
395 }
396 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
397 if (result.changes != 1) {
398 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
399 }
400 })();
401 return this._engineInfo(result);
402 } catch (e) {
403 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
404 throw e;
405 }
406 }
407
408
409 subscriptionDeliveryGone(dbCtx, callback, topicId) {
410 const _scope = _fileScope('subscriptionDeliveryGone');
411 this.logger.debug(_scope, 'called', { callback, topicId });
412
413 let result;
414 try {
415 this.db.transaction(() => {
416 result = this.statement.subscriptionDelete.run({ callback, topicId });
417 if (result.changes != 1) {
418 throw new DBErrors.UnexpectedResult('did not delete subscription');
419 }
420 // Delete cascades to delivery
421 // result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
422 // if (result.changes != 1) {
423 // throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
424 // }
425 })();
426 return this._engineInfo(result);
427 } catch (e) {
428 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
429 throw e;
430 }
431 }
432
433
434 subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays = [60]) {
435 const _scope = _fileScope('subscriptionDeliveryIncomplete');
436 this.logger.debug(_scope, 'called', { callback, topicId, retryDelays });
437
438 let result;
439 try {
440 this.db.transaction(() => {
441 const { currentAttempt } = this.statement.subscriptionDeliveryAttempts.get({ callback, topicId });
442 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
443 result = this.statement.subscriptionDeliveryFailure.run({ nextAttemptDelaySeconds, callback, topicId });
444 if (result.changes != 1) {
445 throw new DBErrors.UnexpectedResult('did not set delivery failure');
446 }
447 result = this.statement.subscriptionDeliveryDone.run({ callback, topicId });
448 if (result.changes != 1) {
449 throw new DBErrors.UnexpectedResult('did not complete subscription delivery');
450 }
451 })();
452 } catch (e) {
453 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
454 throw e;
455 }
456 }
457
458
459 subscriptionGet(dbCtx, callback, topicId) {
460 const _scope = _fileScope('subscriptionGet');
461 this.logger.debug(_scope, 'called', { callback, topicId });
462
463 let subscription;
464 try {
465 subscription = this.statement.subscriptionGet.get({ callback, topicId });
466 return subscription;
467 } catch (e) {
468 this.logger.error(_scope, 'failed', { error: e, callback, topicId });
469 throw e;
470 }
471 }
472
473
474 subscriptionGetById(dbCtx, subscriptionId) {
475 const _scope = _fileScope('subscriptionGetById');
476 this.logger.debug(_scope, 'called', { subscriptionId });
477
478 let subscription;
479 try {
480 subscription = this.statement.subscriptionGetById.get({ subscriptionId });
481 return subscription;
482 } catch (e) {
483 this.logger.error(_scope, 'failed', { error: e, subscriptionId });
484 throw e;
485 }
486 }
487
488
489 subscriptionUpdate(dbCtx, data) {
490 const _scope = _fileScope('subscriptionUpdate');
491 this.logger.debug(_scope, 'called', { data });
492
493 const subscriptionData = {
494 ...data,
495 };
496
497 this._subscriptionUpdateDataValidate(subscriptionData);
498
499 try {
500 const result = this.statement.subscriptionUpdate.run(subscriptionData);
501 if (result.changes != 1) {
502 throw new DBErrors.UnexpectedResult('did not update subscription');
503 }
504 } catch (e) {
505 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
506 throw e;
507 }
508 }
509
510
511 subscriptionUpsert(dbCtx, data) {
512 const _scope = _fileScope('subscriptionUpsert');
513 this.logger.debug(_scope, 'called', { ...data });
514
515 const subscriptionData = {
516 secret: null,
517 httpRemoteAddr: null,
518 httpFrom: null,
519 ...data,
520 }
521 this._subscriptionUpsertDataValidate(subscriptionData);
522
523 let result;
524 try {
525 result = this.statement.subscriptionUpsert.run(subscriptionData);
526 if (result.changes != 1) {
527 throw new DBErrors.UnexpectedResult('did not upsert subscription');
528 }
529 return this._engineInfo(result);
530 } catch (e) {
531 this.logger.error(_scope, 'failed', { error: e, subscriptionData });
532 throw e;
533 }
534 }
535
536
537 topicDeleted(dbCtx, topicId) {
538 const _scope = _fileScope('topicDeleted');
539 this.logger.debug(_scope, 'called', { topicId });
540
541 let result;
542 try {
543 result = this.statement.topicDeleted.run({ topicId });
544 if (result.changes != 1) {
545 throw new DBErrors.UnexpectedResult('did not update topic as deleted');
546 }
547 } catch (e) {
548 this.logger.error(_scope, 'failed', { error: e, topicId });
549 throw e;
550 }
551 }
552
553
554 topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
555 const _scope = _fileScope('topicFetchClaim');
556 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
557
558 let topicIds;
559 try {
560 this.db.transaction(() => {
561 topicIds = this.statement.topicContentFetchNeeded.all({ wanted }).map((claim) => claim.id);
562 topicIds.forEach((topicId) => {
563 const result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
564 if (result.changes != 1) {
565 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
566 }
567 });
568 })();
569 return topicIds;
570 } catch (e) {
571 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, wanted, claimTimeoutSeconds, claimant, topicIds });
572 throw e;
573 }
574 }
575
576
577 topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
578 const _scope = _fileScope('topicFetchClaimById');
579 this.logger.debug(_scope, 'called', { topicId, claimTimeoutSeconds, claimant });
580
581 let result;
582 try {
583 result = this.statement.topicContentFetchClaimById.run({ topicId, claimTimeoutSeconds, claimant });
584 if (result.changes != 1) {
585 throw new DBErrors.UnexpectedResult('did not claim topic fetch');
586 }
587 return this._engineInfo(result);
588 } catch (e) {
589 this.logger.error(_scope, 'failed to claim topics for fetch', { error: e, topicId, claimTimeoutSeconds, claimant });
590 throw e;
591 }
592 }
593
594
595 topicFetchComplete(dbCtx, topicId) {
596 const _scope = _fileScope('topicFetchComplete');
597 this.logger.debug(_scope, 'called', { topicId });
598
599 let result;
600 try {
601 this.db.transaction(() => {
602 result = this.statement.topicAttemptsReset.run({ topicId, forever: EPOCH_FOREVER });
603 if (result.changes != 1) {
604 throw new DBErrors.UnexpectedResult('did not reset topic attempts');
605 }
606 result = this.statement.topicContentFetchDone.run({ topicId });
607 if (result.changes != 1) {
608 throw new DBErrors.UnexpectedResult('did not release topic fetch');
609 }
610 })();
611 return this._engineInfo(result);
612 } catch (e) {
613 this.logger.error(_scope, 'failed', { error: e, result, topicId });
614 throw e;
615 }
616 }
617
618
619 topicFetchIncomplete(dbCtx, topicId, retryDelays = [60]) {
620 const _scope = _fileScope('topicFetchIncomplete');
621 this.logger.debug(_scope, 'called', { topicId });
622
623 let result;
624 try {
625 this.db.transaction(() => {
626 const { contentFetchAttemptsSinceSuccess: currentAttempt } = this.statement.topicAttempts.get({ topicId });
627 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
628 result = this.statement.topicAttemptsIncrement.run({ topicId, nextAttemptDelaySeconds });
629 if (result.changes != 1) {
630 throw new DBErrors.UnexpectedResult('did not set topic attempts');
631 }
632 result = this.statement.topicContentFetchDone.run({ topicId });
633 if (result.changes != 1) {
634 throw new DBErrors.UnexpectedResult('did not release topic fetch');
635 }
636 return result;
637 })();
638 return this._engineInfo(result);
639 } catch (e) {
640 this.logger.error(_scope, 'failed', { error: e, result, topicId });
641 throw e;
642 }
643 }
644
645
646 topicFetchRequested(dbCtx, topicId) {
647 const _scope = _fileScope('topicFetchRequested');
648 this.logger.debug(_scope, 'called', { topicId });
649
650 let result;
651 try {
652 result = this.statement.topicContentFetchRequested.run({ topicId });
653 if (result.changes != 1) {
654 throw new DBErrors.UnexpectedResult('did not set topic fetch requested');
655 }
656 return this._engineInfo(result);
657 } catch (e) {
658 this.logger.error(_scope, 'failed', { error: e, topicId });
659 throw e;
660 }
661 }
662
663
664 /**
665 * Converts engine topic fields to native types.
666 * @param {Object} data
667 */
668 static _topicDataToNative(data) {
669 const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
670 if (data) {
671 data.isActive = !!data.isActive;
672 data.isDeleted = !!data.isDeleted;
673 ['created', 'lastPublish', 'contentFetchNextAttempt', 'contentUpdated'].forEach((field) => {
674 // eslint-disable-next-line security/detect-object-injection
675 data[field] = epochToDate(data[field]);
676 });
677 }
678 return data;
679 }
680
681
682 // eslint-disable-next-line no-unused-vars
683 topicGetAll(dbCtx) {
684 const _scope = _fileScope('topicGetAll');
685 this.logger.debug(_scope, 'called');
686
687 let topics;
688 try {
689 topics = this.statement.topicGetInfoAll.all();
690 } catch (e) {
691 this.logger.error(_scope, 'failed', { error: e, topics });
692 throw e;
693 }
694 if (topics) {
695 topics = topics
696 .map(DatabaseSQLite._topicDataToNative)
697 .map(this._topicDefaults.bind(this));
698 }
699 return topics;
700 }
701
702
703 topicGetById(dbCtx, topicId, applyDefaults = true) {
704 const _scope = _fileScope('topicGetById');
705 this.logger.debug(_scope, 'called', { topicId });
706
707 let topic;
708 try {
709 topic = this.statement.topicGetById.get({ topicId });
710 DatabaseSQLite._topicDataToNative(topic);
711 if (applyDefaults) {
712 topic = this._topicDefaults(topic);
713 }
714 return topic;
715 } catch (e) {
716 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
717 throw e;
718 }
719 }
720
721
722 topicGetByUrl(dbCtx, topicUrl) {
723 const _scope = _fileScope('topicGetByUrl');
724 this.logger.debug(_scope, 'called', { topicUrl });
725
726 let topic;
727 try {
728 topic = this.statement.topicGetByUrl.get({ topicUrl });
729 DatabaseSQLite._topicDataToNative(topic);
730 return this._topicDefaults(topic);
731 } catch (e) {
732 this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
733 throw e;
734 }
735 }
736
737
738 topicGetContentById(dbCtx, topicId) {
739 const _scope = _fileScope('topicGetContentById');
740 this.logger.debug(_scope, 'called', { topicId });
741
742 let topic;
743 try {
744 topic = this.statement.topicGetContentById.get({ topicId });
745 DatabaseSQLite._topicDataToNative(topic);
746 return this._topicDefaults(topic);
747 } catch (e) {
748 this.logger.error(_scope, 'failed', { error: e, topic, topicId });
749 throw e;
750 }
751 }
752
753
754 topicSet(dbCtx, data) {
755 const _scope = _fileScope('topicSet');
756 this.logger.debug(_scope, 'called', data);
757
758 const topicSetData = {
759 publisherValidationUrl: null,
760 leaseSecondsPreferred: null,
761 leaseSecondsMin: null,
762 leaseSecondsMax: null,
763 ...data,
764 };
765
766 let result;
767 try {
768 this._topicSetDataValidate(topicSetData);
769 result = this.statement.topicUpsert.run(topicSetData);
770 if (result.changes != 1) {
771 throw new DBErrors.UnexpectedResult('did not set topic data');
772 }
773 return this._engineInfo(result);
774 } catch (e) {
775 this.logger.error(_scope, 'failed', { error: e, result });
776 throw e;
777 }
778 }
779
780
781 topicSetContent(dbCtx, data) {
782 const _scope = _fileScope('topicSetContent');
783 const topicSetContentData = {
784 contentType: null,
785 ...data,
786 };
787 const logData = {
788 ...topicSetContentData,
789 content: common.logTruncate(topicSetContentData.content, 100),
790 };
791 this.logger.debug(_scope, 'called', logData);
792
793 let result;
794 try {
795 this._topicSetContentDataValidate(topicSetContentData);
796 result = this.statement.topicSetContent.run(topicSetContentData);
797 logData.result = result;
798 if (result.changes != 1) {
799 throw new DBErrors.UnexpectedResult('did not set topic content');
800 }
801 return this._engineInfo(result);
802 } catch (e) {
803 this.logger.error(_scope, 'failed', { error: e, ...logData });
804 throw e;
805 }
806 }
807
808
809 topicUpdate(dbCtx, data) {
810 const _scope = _fileScope('topicUpdate');
811 this.logger.debug(_scope, 'called', { data });
812
813 const topicData = {
814 leaseSecondsPreferred: null,
815 leaseSecondsMin: null,
816 leaseSecondsMax: null,
817 publisherValidationUrl: null,
818 ...data,
819 };
820
821 this._topicUpdateDataValidate(topicData);
822
823 try {
824 const result = this.statement.topicUpdate.run(topicData);
825 if (result.changes != 1) {
826 throw new DBErrors.UnexpectedResult('did not update topic');
827 }
828 } catch (e) {
829 this.logger.error(_scope, 'failed', { error: e, topicData });
830 throw e;
831 }
832 }
833
834
835 verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
836 const _scope = _fileScope('verificationClaim');
837 this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds });
838
839 let verificationIds;
840 try {
841 this.db.transaction(() => {
842 verificationIds = this.statement.verificationNeeded.all({ wanted }).map((claim) => claim.id);
843 verificationIds.forEach((verificationId) => {
844 const result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
845 if (result.changes != 1) {
846 throw new DBErrors.UnexpectedResult('did not claim verification');
847 }
848 });
849 })();
850 return verificationIds;
851 } catch (e) {
852 this.logger.error(_scope, 'failed to claim verifications', { wanted, claimTimeoutSeconds });
853 throw e;
854 }
855 }
856
857
858 verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
859 const _scope = _fileScope('verificationClaimById');
860 this.logger.debug(_scope, 'called', { verificationId, claimTimeoutSeconds, claimant });
861
862 let result;
863 try {
864 result = this.statement.verificationClaimById.run({ verificationId, claimTimeoutSeconds, claimant });
865 if (result.changes != 1) {
866 throw new DBErrors.UnexpectedResult('did not claim verification');
867 }
868 return this._engineInfo(result);
869 } catch (e) {
870 this.logger.error(_scope, 'failed to claim verification', { error: e, verificationId, claimTimeoutSeconds, claimant });
871 throw e;
872 }
873 }
874
875
876 verificationComplete(dbCtx, verificationId, callback, topicId) {
877 const _scope = _fileScope('verificationComplete');
878 this.logger.debug(_scope, 'called', { verificationId });
879
880 let result;
881 try {
882 this.db.transaction(() => {
883 result = this.statement.verificationScrub.run({ verificationId, callback, topicId });
884 if (result.changes < 1) {
885 throw new DBErrors.UnexpectedResult('did not remove verifications');
886 }
887 })();
888 } catch (e) {
889 this.logger.error(_scope, 'failed', { verificationId });
890 throw e;
891 }
892 return this._engineInfo(result);
893 }
894
895
896 /**
897 * Converts engine verification fields to native types.
898 * @param {Object} data
899 */
900 static _verificationDataToNative(data) {
901 if (data) {
902 data.isPublisherValidated = !!data.isPublisherValidated;
903 }
904 }
905
906
907 verificationGetById(dbCtx, verificationId) {
908 const _scope = _fileScope('verificationGetById');
909 this.logger.debug(_scope, 'called', { verificationId });
910
911 let verification;
912 try {
913 verification = this.statement.verificationGetById.get({ verificationId });
914 DatabaseSQLite._verificationDataToNative(verification);
915 return verification;
916 } catch (e) {
917 this.logger.error(_scope, 'failed', { error: e, verificationId });
918 throw e;
919 }
920 }
921
922
923 verificationIncomplete(dbCtx, verificationId, retryDelays = [60]) {
924 const _scope = _fileScope('verificationIncomplete');
925 this.logger.debug(_scope, 'called', { verificationId });
926
927 let result;
928 try {
929 this.db.transaction(() => {
930 const { attempts: currentAttempt } = this.statement.verificationAttempts.get({ verificationId });
931 const nextAttemptDelaySeconds = common.attemptRetrySeconds(currentAttempt, retryDelays);
932 result = this.statement.verificationAttemptsIncrement.run({ verificationId, nextAttemptDelaySeconds });
933 if (result.changes != 1) {
934 throw new DBErrors.UnexpectedResult('did not increment verification attempts');
935 }
936 result = this.statement.verificationDone.run({ verificationId });
937 if (result.changes != 1) {
938 throw new DBErrors.UnexpectedResult('did not release verification in progress');
939 }
940 return result;
941 })();
942 return this._engineInfo(result);
943 } catch (e) {
944 this.logger.error(_scope, 'failed', { error: e, result, verificationId });
945 throw e;
946 }
947 }
948
949
950 /**
951 * Convert native verification fields to engine types.
952 */
953 static _verificationDataToEngine(data) {
954 if (data) {
955 data.isPublisherValidated = data.isPublisherValidated ? 1 : 0;
956 }
957 }
958
959
960 verificationInsert(dbCtx, verification) {
961 const _scope = _fileScope('verificationInsert');
962 this.logger.debug(_scope, 'called', { verification });
963
964 const verificationData = {
965 secret: null,
966 httpRemoteAddr: null,
967 httpFrom: null,
968 requestId: null,
969 ...verification,
970 };
971
972 let result, verificationId;
973 try {
974 this._verificationDataValidate(verificationData);
975 DatabaseSQLite._verificationDataToEngine(verificationData);
976 result = this.statement.verificationInsert.run(verificationData);
977 if (result.changes != 1) {
978 throw new DBErrors.UnexpectedResult('did not insert verification');
979 }
980 verificationId = result.lastInsertRowid;
981 this.logger.debug(_scope, 'inserted verification', { verificationId });
982
983 return verificationId;
984 } catch (e) {
985 this.logger.error(_scope, 'failed', { error: e, verificationData });
986 throw e;
987 }
988 }
989
990
991 verificationRelease(dbCtx, verificationId) {
992 const _scope = _fileScope('verificationRelease');
993 this.logger.debug(_scope, 'called', { verificationId });
994
995 let result;
996 try {
997 result = this.statement.verificationDone.run({ verificationId });
998 if (result.changes != 1) {
999 throw new DBErrors.UnexpectedResult('did not release verification');
1000 }
1001 } catch (e) {
1002 this.logger.error(_scope, 'failed', { error: e, verificationId });
1003 throw e;
1004 }
1005 }
1006
1007
1008 verificationUpdate(dbCtx, verificationId, data) {
1009 const _scope = _fileScope('verificationUpdate');
1010 this.logger.debug(_scope, 'called', { verificationId, data });
1011
1012 const verificationData = {
1013 reason: null,
1014 verificationId,
1015 ...data,
1016 };
1017
1018 let result;
1019 try {
1020 this._verificationUpdateDataValidate(verificationData);
1021 DatabaseSQLite._verificationDataToEngine(verificationData);
1022 result = this.statement.verificationUpdate.run(verificationData);
1023 if (result.changes != 1) {
1024 throw new DBErrors.UnexpectedResult('did not update verification');
1025 }
1026 } catch (e) {
1027 this.logger.error(_scope, 'failed', { error: e, verificationData });
1028 throw e;
1029 }
1030 }
1031
1032
1033 verificationValidated(dbCtx, verificationId) {
1034 const _scope = _fileScope('verificationValidated');
1035 this.logger.debug(_scope, 'called', { verificationId });
1036
1037 let result;
1038 try {
1039 result = this.statement.verificationValidate.run({ verificationId });
1040 if (result.changes != 1) {
1041 throw new DBErrors.UnexpectedResult('did not set verification validation');
1042 }
1043 } catch (e) {
1044 this.logger.error(_scope, 'failed', { error: e, verificationId });
1045 throw e;
1046 }
1047 }
1048
1049
1050 }
1051
1052 module.exports = DatabaseSQLite;