minor doc update
[websub-hub] / src / db / base.js
1 /* eslint-disable no-unused-vars */
2 'use strict';
3
4 /**
5 * This is the semi-abstract database class, providing interface and utility methods.
6 */
7
8 const common = require('../common');
9 const DBErrors = require('./errors');
10 const svh = require('./schema-version-helper');
11
12 const _fileScope = common.fileScope(__filename);
13
14 class Database {
15 constructor(logger = common.nullLogger, options = {}) {
16 this.logger = logger;
17 common.ensureLoggerLevels(this.logger);
18
19 // Store the merged config and default values for lease values.
20 // N.B. breaking hierarchy of config options here
21 this.topicLeaseDefaults = {};
22 common.setOptions(this.topicLeaseDefaults, common.topicLeaseDefaults(), options.topicLeaseDefaults || {});
23 }
24
25
26 /**
27 * Turn a snake into a camel.
28 * Used when translating SQL column names to JS object style.
29 * @param {String} snakeCase
30 * @param {String|RegExp} delimiter
31 * @returns {String}
32 */
33 static _camelfy(snakeCase, delimiter = '_') {
34 if (!snakeCase || typeof snakeCase.split !== 'function') {
35 return undefined;
36 }
37 const words = snakeCase.split(delimiter);
38 return [
39 words.shift(),
40 ...words.map((word) => word.charAt(0).toUpperCase() + word.slice(1)),
41 ].join('');
42 }
43
44
45 /**
46 * Basic type checking of object properties.
47 * @param {Object} object
48 * @param {String[]} properties
49 * @param {String[]} types
50 */
51 _ensureTypes(object, properties, types) {
52 const _scope = _fileScope('_ensureTypes');
53
54 if (!(object && properties && types)) {
55 this.logger.error(_scope, 'undefined argument', { object, properties, types });
56 throw new DBErrors.DataValidation();
57 }
58 properties.forEach((p) => {
59 // eslint-disable-next-line security/detect-object-injection
60 const pObj = object[p];
61 const pType = typeof pObj;
62 if (!types.includes(pType)
63 && !(pObj instanceof Buffer && types.includes('buffer'))
64 && !(pObj === null && types.includes('null'))
65 && !(pType === 'bigint' && types.includes('number'))) {
66 const reason = `'${p}' is '${pType}', but must be ${types.length > 1 ? 'one of ' : ''}'${types}'`;
67 this.logger.error(_scope, reason, {});
68 throw new DBErrors.DataValidation(reason);
69 }
70 });
71 }
72
73
74 /**
75 * Interface methods need implementations.
76 * @param {String} method
77 * @param {arguments} args
78 */
79 _notImplemented(method, args) {
80 this.logger.error(_fileScope(method), 'abstract method called', Array.from(args));
81 throw new DBErrors.NotImplemented(method);
82 }
83
84
85 /**
86 * Perform tasks needed to prepare database for use. Ensure this is called
87 * after construction, and before any other database activity.
88 * At the minimum, this will validate a compatible schema is present and usable.
89 * Some engines will also perform other initializations or async actions which
90 * are easier handled outside the constructor.
91 */
92 async initialize() {
93 const _scope = _fileScope('initialize');
94
95 const currentSchema = await this._currentSchema();
96 const current = svh.schemaVersionObjectToNumber(currentSchema);
97 const min = svh.schemaVersionObjectToNumber(this.schemaVersionsSupported.min);
98 const max = svh.schemaVersionObjectToNumber(this.schemaVersionsSupported.max);
99 if (current >= min && current <= max) {
100 this.logger.debug(_scope, 'schema supported', { currentSchema, schemaVersionsSupported: this.schemaVersionsSupported });
101 } else {
102 this.logger.error(_scope, 'schema not supported', { currentSchema, schemaVersionsSupported: this.schemaVersionsSupported });
103 throw new DBErrors.MigrationNeeded();
104 }
105 }
106
107
108 /**
109 * Perform db connection healthcheck.
110 */
111 async healthCheck() {
112 this._notImplemented('healthCheck', arguments);
113 }
114
115
116 /**
117 * Replace any NULL from topic DB entry with default values.
118 * @param {Object} topic
119 * @returns {Object}
120 */
121 _topicDefaults(topic) {
122 if (topic) {
123 for (const [key, value] of Object.entries(this.topicLeaseDefaults)) {
124 // eslint-disable-next-line security/detect-object-injection
125 if (!(key in topic) || topic[key] === null) {
126 // eslint-disable-next-line security/detect-object-injection
127 topic[key] = value;
128 }
129 }
130 }
131 return topic;
132 }
133
134
135 /**
136 * Ensures any lease durations in data are consistent.
137 * @param {Object} data
138 */
139 _leaseDurationsValidate(data) {
140 const leaseProperties = Object.keys(this.topicLeaseDefaults)
141 this._ensureTypes(data, leaseProperties, ['number', 'undefined', 'null']);
142
143 // Populate defaults on a copy of values so we can check proper numerical ordering
144 const leaseValues = common.pick(data, leaseProperties);
145 this._topicDefaults(leaseValues);
146 for (const [prop, value] of Object.entries(leaseValues)) {
147 if (value <= 0) {
148 throw new DBErrors.DataValidation(`${prop} must be positive`);
149 }
150 }
151 if (!(leaseValues.leaseSecondsMin <= leaseValues.leaseSecondsPreferred && leaseValues.leaseSecondsPreferred <= leaseValues.leaseSecondsMax)) {
152 throw new DBErrors.DataValidation('lease durations violate numerical ordering');
153 }
154 }
155
156
157 /**
158 * Basic field validation for setting topic data.
159 * @param {Object} data
160 */
161 _topicSetDataValidate(data) {
162 this._ensureTypes(data, ['url'], ['string']);
163 this._ensureTypes(data, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
164 this._leaseDurationsValidate(data);
165 }
166
167
168 /**
169 * Basic field validation for setting topic content.
170 * @param {Object} data
171 */
172 _topicSetContentDataValidate(data) {
173 this._ensureTypes(data, ['content'], ['string', 'buffer']);
174 this._ensureTypes(data, ['contentHash'], ['string']);
175 this._ensureTypes(data, ['contentType'], ['string', 'null', 'undefined']);
176 }
177
178
179 /**
180 * Basic field validation for updating topic.
181 * @param {Object} data
182 */
183 _topicUpdateDataValidate(data) {
184 this._ensureTypes(data, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
185 if (data.publisherValidationUrl) {
186 try {
187 new URL(data.publisherValidationUrl);
188 } catch (e) {
189 throw new DBErrors.DataValidation('invalid URL format');
190 }
191 }
192 this._ensureTypes(data, ['contentHashAlgorithm'], ['string']);
193 if (!common.validHash(data.contentHashAlgorithm)) {
194 throw new DBErrors.DataValidation('unsupported hash algorithm');
195 }
196 this._leaseDurationsValidate(data);
197 }
198
199
200 /**
201 * Basic field validation for setting verification data.
202 * @param {Object} data
203 */
204 _verificationDataValidate(data) {
205 this._ensureTypes(data, ['topicId'], ['string', 'number']);
206 this._ensureTypes(data, ['callback', 'mode'], ['string']);
207 this._ensureTypes(data, ['secret', 'httpRemoteAddr', 'httpFrom', 'requestId'], ['string', 'null', 'undefined']);
208 this._ensureTypes(data, ['leaseSeconds'], ['number']);
209 this._ensureTypes(data, ['isPublisherValidated'], ['boolean']);
210 }
211
212
213 /**
214 * Basic field validation for updating verification data.
215 * @param {Object} verification
216 */
217 _verificationUpdateDataValidate(data) {
218 this._ensureTypes(data, ['verificationId'], ['string', 'number']);
219 this._ensureTypes(data, ['mode'], ['string']);
220 this._ensureTypes(data, ['reason'], ['string', 'null', 'undefined']);
221 this._ensureTypes(data, ['isPublisherValidated'], ['boolean']);
222 }
223
224
225 /**
226 * Basic field validation for upserting subscription data.
227 * @param {Object} subscription
228 */
229 _subscriptionUpsertDataValidate(data) {
230 this._ensureTypes(data, ['topicId'], ['string', 'number']);
231 this._ensureTypes(data, ['callback'], ['string']);
232 this._ensureTypes(data, ['leaseSeconds'], ['number']);
233 this._ensureTypes(data, ['secret', 'httpRemoteAddr', 'httpFrom'], ['string', 'null', 'undefined']);
234 }
235
236
237 _subscriptionUpdateDataValidate(data) {
238 this._ensureTypes(data, ['signatureAlgorithm'], ['string', 'null', 'undefined']);
239 if (!common.validHash(data.signatureAlgorithm)) {
240 throw new DBErrors.DataValidation('unsupported hash algorithm');
241 }
242
243 }
244
245 /* Interface methods */
246
247 /**
248 * Normalize query information to a common form from a specific backend.
249 * @param {*} result
250 * @returns {Object} info
251 * @returns {Number} info.changes
252 * @returns {*} info.lastInsertRowid
253 * @returns {Number} info.duration
254 */
255 _engineInfo(result) {
256 this._notImplemented('engineInfo', arguments);
257 }
258
259
260 /**
261 * Query the current schema version.
262 * This is a standalone query function, as it is called before statements are loaded.
263 * @returns {Object} version
264 * @returns {Number} version.major
265 * @returns {Number} version.minor
266 * @returns {Number} version.patch
267 */
268 async _currentSchema() {
269 this._notImplemented('_currentSchema', arguments);
270 }
271
272
273 /**
274 * Wrap a function call in a database context.
275 * @param {Function} fn fn(ctx)
276 */
277 async context(fn) {
278 this._notImplemented('context', arguments);
279 }
280
281
282 /**
283 * Wrap a function call in a transaction context.
284 * @param {*} dbCtx
285 * @param {Function} fn fn(txCtx)
286 */
287 async transaction(dbCtx, fn) {
288 this._notImplemented('transaction', arguments);
289 }
290
291
292 /**
293 * Store an authentication success event.
294 * @param {*} dbCtx
295 * @param {String} identifier
296 */
297 async authenticationSuccess(dbCtx, identifier) {
298 this._notImplemented('authenticationSuccess', arguments);
299 }
300
301
302 /**
303 * Fetch authentication data for identifier.
304 * @param {*} dbCtx
305 * @param {*} identifier
306 */
307 async authenticationGet(dbCtx, identifier) {
308 this._notImplemented('authenticationGet', arguments);
309 }
310
311
312 /**
313 * Create or update an authentication entity.
314 * @param {*} dbCtx
315 * @param {String} identifier
316 * @param {String} credential
317 */
318 async authenticationUpsert(dbCtx, identifier, credential) {
319 this._notImplemented('authenticationUpsert', arguments);
320 }
321
322
323 /**
324 * All subscriptions to a topic.
325 * @param {*} dbCtx
326 * @param {String} topicId
327 */
328 async subscriptionsByTopicId(dbCtx, topicId) {
329 this._notImplemented('subscriptionsByTopicId', arguments);
330 }
331
332
333 /**
334 * Number of subscriptions to a topic.
335 * @param {*} dbCtx
336 * @param {String} topicUrl
337 */
338 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
339 this._notImplemented('subscriptionCountByTopicUrl', arguments);
340 }
341
342
343 /**
344 * Remove an existing subscription.
345 * @param {*} dbCtx
346 * @param {String} callback
347 * @param {*} topicId
348 */
349 async subscriptionDelete(dbCtx, callback, topicId) {
350 this._notImplemented('subscriptionDelete', arguments);
351 }
352
353
354 /**
355 * Remove any expired subscriptions to a topic.
356 * @param {*} dbCtx
357 * @param {*} topicId
358 */
359 async subscriptionDeleteExpired(dbCtx, topicId) {
360 this._notImplemented('subscriptionDeleteExpired', arguments);
361 }
362
363
364 /**
365 * Claim subscriptions needing content updates attempted.
366 * @param {*} dbCtx
367 * @param {Number} wanted maximum subscription updates to claim
368 * @param {Integer} claimTimeoutSeconds age of claimed updates to reclaim
369 * @param {String} claimant
370 * @returns {Array} list of subscriptions
371 */
372 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
373 this._notImplemented('subscriptionDeliveryClaim', arguments);
374 }
375
376
377 /**
378 * Claim a subscription delivery.
379 * @param {*} dbCtx
380 * @param {*} subscriptionId
381 * @param {*} claimTimeoutSeconds
382 * @param {*} claimant
383 */
384 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
385 this._notImplemented('subscriptionDeliveryClaimById', arguments);
386 }
387
388
389 /**
390 * A subscriber successfully received new topic content, update subscription.
391 * @param {*} dbCtx
392 * @param {String} callback
393 * @param {*} topicId
394 */
395 async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
396 this._notImplemented('subscriptionDeliveryComplete', arguments);
397 }
398
399
400 /**
401 * A subscriber denied new topic content, remove subscription.
402 * @param {*} dbCtx
403 * @param {String} callback
404 * @param {*} topicId
405 */
406 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
407 this._notImplemented('subscriptionDeliveryGone', arguments);
408 }
409
410
411 /**
412 * An attempt to deliver content to a subscriber did not complete, update delivery accordingly.
413 * @param {*} dbCtx
414 * @param {String} callback
415 * @param {*} topicId
416 * @param {Number[]} retryDelays
417 */
418 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays) {
419 this._notImplemented('subscriptionDeliveryIncomplete', arguments);
420 }
421
422
423 /**
424 * Fetch subscription details
425 * @param {*} dbCtx
426 * @param {String} callback
427 * @param {*} topicId
428 */
429 async subscriptionGet(dbCtx, callback, topicId) {
430 this._notImplemented('subscriptionGet', arguments);
431 }
432
433
434 /**
435 * Fetch subscription details
436 * @param {*} dbCtx
437 * @param {*} subscriptionId
438 */
439 async subscriptionGetById(dbCtx, subscriptionId) {
440 this._notImplemented('subscriptionGetById', arguments);
441 }
442
443
444 /**
445 * Set subscription details
446 * @param {*} dbCtx
447 * @param {Object} data
448 * @param {String} data.callback
449 * @param {*} data.topicId
450 * @param {Number} data.leaseSeconds
451 * @param {String=} data.secret
452 * @param {String=} data.httpRemoteAddr
453 * @param {String=} data.httpFrom
454 */
455 async subscriptionUpsert(dbCtx, data) {
456 this._notImplemented('subscriptionUpsert', arguments);
457 }
458
459
460 /**
461 * Set some subscription fields
462 * @param {*} dbCtx
463 * @param {Object} data
464 * @param {*} data.subscriptionId
465 * @param {String} data.signatureAlgorithm
466 */
467 async subscriptionUpdate(dbCtx, data) {
468 this._notImplemented('subscriptionUpdate', arguments);
469 }
470
471
472 /**
473 * Sets the isDeleted flag on a topic, and reset update time.
474 * @param {*} txCtx
475 * @param {*} topicId
476 */
477 async topicDeleted(dbCtx, topicId) {
478 this._notImplemented('topicDeleted', arguments);
479 }
480
481
482 /**
483 * Claim topics to fetch updates for, from available.
484 * @param {*} dbCtx
485 * @param {Integer} wanted maximum topic fetches to claim
486 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
487 * @param {String} claimant node id claiming these fetches
488 */
489 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
490 this._notImplemented('topicFetchClaim', arguments);
491 }
492
493
494 /**
495 * Claim a topic to update.
496 * @param {*} dbCtx
497 * @param {*} topicId
498 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
499 * @param {String} claimant node id claiming these fetches
500 */
501 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
502 this._notImplemented('topicFetchClaim', arguments);
503 }
504
505
506 /**
507 * Reset publish state, and reset deliveries for subscribers.
508 * @param {*} dbCtx
509 * @param {*} topicId
510 */
511 async topicFetchComplete(dbCtx, topicId) {
512 this._notImplemented('topicFetchComplete', arguments);
513 }
514
515
516 /**
517 * Bump count of attempts and release claim on update.
518 * @param {*} dbCtx
519 * @param {*} topicId
520 * @param {Number[]} retryDelays
521 */
522 async topicFetchIncomplete(dbCtx, topicId, retryDelays) {
523 this._notImplemented('topicFetchIncomplete', arguments);
524 }
525
526
527 /**
528 * Set a topic as ready to be checked for an update.
529 * @param {*} dbCtx
530 * @param {*} topicId
531 * @returns {Boolean}
532 */
533 async topicFetchRequested(dbCtx, topicId) {
534 this._notImplemented('topicPublish', arguments);
535 }
536
537
538 /**
539 * Get all data for all topics, including subscription count.
540 * @param {*} dbCtx
541 */
542 async topicGetAll(dbCtx) {
543 this._notImplemented('topicGetAll', arguments);
544 }
545
546
547 /**
548 * Get topic data, without content.
549 * @param {*} dbCtx
550 * @param {String} topicUrl
551 */
552 async topicGetByUrl(dbCtx, topicUrl) {
553 this._notImplemented('topicGetByUrl', arguments);
554 }
555
556
557 /**
558 * Get topic data, without content.
559 * @param {*} dbCtx
560 * @param {*} topicId
561 * @param {Boolean} applyDefaults
562 */
563 async topicGetById(dbCtx, topicId, applyDefaults = true) {
564 this._notImplemented('topicGetById', arguments);
565 }
566
567
568 /**
569 * Returns topic data with content.
570 * @param {*} dbCx
571 * @param {*} topicId
572 */
573 async topicGetContentById(dbCx, topicId) {
574 this._notImplemented('topicGetContentById', arguments);
575 }
576
577
578 /**
579 * Attempt to delete a topic, which must be set isDeleted, if there
580 * are no more subscriptions belaying its removal.
581 * @param {*} topicId
582 */
583 async topicPendingDelete(dbCtx, topicId) {
584 this._notImplemented('topicPendingDelete', arguments);
585 }
586
587
588 /**
589 * Return an array of the counts of the last #days of topic updates.
590 * @param {*} dbCtx
591 * @param {*} topicId
592 * @param {Number} days
593 * @returns {Number[]}
594 */
595 async topicPublishHistory(dbCtx, topicId, days) {
596 this._notImplemented('topicPublishHistory', arguments);
597 }
598
599
600 /**
601 * Create or update the basic parameters of a topic.
602 * @param {*} dbCtx
603 * @param {TopicData} data
604 */
605 async topicSet(dbCtx, data) {
606 this._notImplemented('topicSet', arguments);
607 }
608
609
610 /**
611 * Updates a topic's content data and content update timestamp.
612 * @param {Object} data
613 * @param {*} data.topicId
614 * @param {String} data.content
615 * @param {String} data.contentHash
616 * @param {String=} data.contentType
617 */
618 async topicSetContent(dbCtx, data) {
619 this._notImplemented('topicSetContent', arguments);
620 }
621
622
623 /**
624 * Set some topic fields.
625 * @param {*} dbCtx
626 * @param {Object} data
627 * @param {*} data.topicId
628 * @param {Number=} data.leaseSecondsPreferred
629 * @param {Number=} data.leaseSecondsMin
630 * @param {Number=} data.leaseSecondsMax
631 * @param {String=} data.publisherValidationUrl
632 * @param {String=} data.contentHashAlgorithm
633 */
634 async topicUpdate(dbCtx, data) {
635 this._notImplemented('topicUpdate', arguments);
636 }
637
638
639 /**
640 * Claim pending verifications for attempted resolution.
641 * @param {*} dbCtx
642 * @param {Integer} wanted maximum verifications to claim
643 * @param {Integer} claimTimeoutSeconds age of claimed verifications to reclaim
644 * @returns {Verification[]} array of claimed verifications
645 */
646 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
647 this._notImplemented('verificationClaim', arguments);
648 }
649
650
651 /**
652 * Claim a specific verification by id, if no other similar verification claimed.
653 * @param {*} dbCtx
654 * @param {*} verificationId
655 * @param {Number} claimTimeoutSeconds
656 * @param {String} claimant
657 */
658 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
659 this._notImplemented('verificationClaimById', arguments);
660 }
661
662
663 /**
664 * Remove the verification, any older
665 * verifications for that same client/topic, and the claim.
666 * @param {*} dbCtx
667 * @param {*} verificationId
668 * @param {String} callback
669 * @param {*} topicId
670 */
671 async verificationComplete(dbCtx, verificationId, callback, topicId) {
672 this._notImplemented('verificationComplete', arguments);
673 }
674
675
676 /**
677 * Get verification data.
678 * @param {*} dbCtx
679 * @param {*} verificationId
680 */
681 async verificationGetById(dbCtx, verificationId) {
682 this._notImplemented('verificationGetById', arguments);
683 }
684
685
686 /**
687 * Update database that a client verification was unable to complete.
688 * This releases the delivery claim and reschedules for some future time.
689 * @param {*} dbCtx
690 * @param {String} callback client callback url
691 * @param {*} topicId internal topic id
692 * @param {Number[]} retryDelays
693 */
694 async verificationIncomplete(dbCtx, verificationId, retryDelays) {
695 this._notImplemented('verificationIncomplete', arguments);
696 }
697
698
699 /**
700 * Create a new pending verification.
701 * @param {*} dbCtx
702 * @param {VerificationData} data
703 * @param {Boolean} claim
704 * @returns {*} verificationId
705 */
706 async verificationInsert(dbCtx, verification) {
707 this._notImplemented('verificationInsert', arguments);
708 }
709
710
711 /**
712 * Relinquish the claim on a verification, without any other updates.
713 * @param {*} dbCtx
714 * @param {String} callback client callback url
715 * @param {*} topicId internal topic id
716 */
717 async verificationRelease(dbCtx, verificationId) {
718 this._notImplemented('verificationRelease', arguments);
719 }
720
721
722 /**
723 * Updates some fields of an existing (presumably claimed) verification.
724 * @param {*} dbCtx
725 * @param {*} verificationId
726 * @param {Object} data
727 * @param {String} data.mode
728 * @param {String} data.reason
729 * @param {Boolean} data.isPublisherValidated
730 */
731 async verificationUpdate(dbCtx, verificationId, data) {
732 this._notImplemented('verificationUpdate', arguments);
733 }
734
735
736 /**
737 * Sets the isPublisherValidated flag on a verification and resets the delivery
738 * @param {*} dbCtx
739 * @param {*} verificationId
740 */
741 async verificationValidated(dbCtx, verificationId) {
742 this._notImplemented('verificationValidated', arguments);
743 }
744
745 }
746
747 module.exports = Database;