fix topic update integration test, topicGetByUrl now optionally applies defaults
[websub-hub] / src / db / base.js
1 /* eslint-disable no-unused-vars */
2 'use strict';
3
4 /**
5 * This is the semi-abstract database class, providing interface and utility methods.
6 */
7
8 const common = require('../common');
9 const DBErrors = require('./errors');
10 const svh = require('./schema-version-helper');
11
12 const _fileScope = common.fileScope(__filename);
13
14 class Database {
15 constructor(logger = common.nullLogger, options = {}) {
16 this.logger = logger;
17 common.ensureLoggerLevels(this.logger);
18
19 // Store the merged config and default values for lease values.
20 // N.B. breaking hierarchy of config options here
21 this.topicLeaseDefaults = {};
22 common.setOptions(this.topicLeaseDefaults, common.topicLeaseDefaults(), options.topicLeaseDefaults || {});
23 }
24
25
26 /**
27 * Turn a snake into a camel.
28 * Used when translating SQL column names to JS object style.
29 * @param {String} snakeCase
30 * @param {String|RegExp} delimiter
31 * @returns {String}
32 */
33 static _camelfy(snakeCase, delimiter = '_') {
34 if (!snakeCase || typeof snakeCase.split !== 'function') {
35 return undefined;
36 }
37 const words = snakeCase.split(delimiter);
38 return [
39 words.shift(),
40 ...words.map((word) => word.charAt(0).toUpperCase() + word.slice(1)),
41 ].join('');
42 }
43
44
45 /**
46 * Basic type checking of object properties.
47 * @param {Object} object
48 * @param {String[]} properties
49 * @param {String[]} types
50 */
51 _ensureTypes(object, properties, types) {
52 const _scope = _fileScope('_ensureTypes');
53
54 if (!(object && properties && types)) {
55 this.logger.error(_scope, 'undefined argument', { object, properties, types });
56 throw new DBErrors.DataValidation();
57 }
58 properties.forEach((p) => {
59 // eslint-disable-next-line security/detect-object-injection
60 const pObj = object[p];
61 const pType = typeof pObj;
62 if (!types.includes(pType)
63 && !(pObj instanceof Buffer && types.includes('buffer'))
64 && !(pObj === null && types.includes('null'))
65 && !(pType === 'bigint' && types.includes('number'))) {
66 const reason = `'${p}' is '${pType}', but must be ${types.length > 1 ? 'one of ' : ''}'${types}'`;
67 this.logger.error(_scope, reason, {});
68 throw new DBErrors.DataValidation(reason);
69 }
70 });
71 }
72
73
74 /**
75 * Interface methods need implementations.
76 * @param {String} method
77 * @param {arguments} args
78 */
79 _notImplemented(method, args) {
80 this.logger.error(_fileScope(method), 'abstract method called', Array.from(args));
81 throw new DBErrors.NotImplemented(method);
82 }
83
84
85 /**
86 * Perform tasks needed to prepare database for use. Ensure this is called
87 * after construction, and before any other database activity.
88 * At the minimum, this will validate a compatible schema is present and usable.
89 * Some engines will also perform other initializations or async actions which
90 * are easier handled outside the constructor.
91 */
92 async initialize() {
93 const _scope = _fileScope('initialize');
94
95 const currentSchema = await this._currentSchema();
96 const current = svh.schemaVersionObjectToNumber(currentSchema);
97 const min = svh.schemaVersionObjectToNumber(this.schemaVersionsSupported.min);
98 const max = svh.schemaVersionObjectToNumber(this.schemaVersionsSupported.max);
99 if (current >= min && current <= max) {
100 this.logger.debug(_scope, 'schema supported', { currentSchema, schemaVersionsSupported: this.schemaVersionsSupported });
101 } else {
102 this.logger.error(_scope, 'schema not supported', { currentSchema, schemaVersionsSupported: this.schemaVersionsSupported });
103 throw new DBErrors.MigrationNeeded();
104 }
105 }
106
107
108 /**
109 * Perform db connection healthcheck.
110 */
111 async healthCheck() {
112 this._notImplemented('healthCheck', arguments);
113 }
114
115
116 /**
117 * Replace any NULL from topic DB entry with default values.
118 * @param {Object} topic
119 * @returns {Object}
120 */
121 _topicDefaults(topic) {
122 if (topic) {
123 for (const [key, value] of Object.entries(this.topicLeaseDefaults)) {
124 // eslint-disable-next-line security/detect-object-injection
125 if (!(key in topic) || topic[key] === null) {
126 // eslint-disable-next-line security/detect-object-injection
127 topic[key] = value;
128 }
129 }
130 }
131 return topic;
132 }
133
134
135 /**
136 * Ensures any lease durations in data are consistent.
137 * @param {Object} data
138 */
139 _leaseDurationsValidate(data) {
140 const leaseProperties = Object.keys(this.topicLeaseDefaults)
141 this._ensureTypes(data, leaseProperties, ['number', 'undefined', 'null']);
142
143 // Populate defaults on a copy of values so we can check proper numerical ordering
144 const leaseValues = common.pick(data, leaseProperties);
145 this._topicDefaults(leaseValues);
146 for (const [prop, value] of Object.entries(leaseValues)) {
147 if (value <= 0) {
148 throw new DBErrors.DataValidation(`${prop} must be positive`);
149 }
150 }
151 if (!(leaseValues.leaseSecondsMin <= leaseValues.leaseSecondsPreferred && leaseValues.leaseSecondsPreferred <= leaseValues.leaseSecondsMax)) {
152 throw new DBErrors.DataValidation('lease durations violate numerical ordering');
153 }
154 }
155
156
157 /**
158 * Basic field validation for setting topic data.
159 * @param {Object} data
160 */
161 _topicSetDataValidate(data) {
162 this._ensureTypes(data, ['url'], ['string']);
163 this._ensureTypes(data, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
164 this._leaseDurationsValidate(data);
165 }
166
167
168 /**
169 * Basic field validation for setting topic content.
170 * @param {Object} data
171 */
172 _topicSetContentDataValidate(data) {
173 this._ensureTypes(data, ['content'], ['string', 'buffer']);
174 this._ensureTypes(data, ['contentHash'], ['string']);
175 this._ensureTypes(data, ['contentType'], ['string', 'null', 'undefined']);
176 this._ensureTypes(data, ['eTag'], ['string', 'null', 'undefined']);
177 this._ensureTypes(data, ['lastModified'], ['string', 'null', 'undefined']);
178 }
179
180
181 /**
182 * Basic field validation for updating topic.
183 * @param {Object} data
184 */
185 _topicUpdateDataValidate(data) {
186 this._ensureTypes(data, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
187 if (data.publisherValidationUrl) {
188 try {
189 new URL(data.publisherValidationUrl);
190 } catch (e) {
191 throw new DBErrors.DataValidation('invalid URL format');
192 }
193 }
194 this._ensureTypes(data, ['contentHashAlgorithm'], ['string']);
195 if (!common.validHash(data.contentHashAlgorithm)) {
196 throw new DBErrors.DataValidation('unsupported hash algorithm');
197 }
198 this._leaseDurationsValidate(data);
199 }
200
201
202 /**
203 * Basic field validation for setting verification data.
204 * @param {Object} data
205 */
206 _verificationDataValidate(data) {
207 this._ensureTypes(data, ['topicId'], ['string', 'number']);
208 this._ensureTypes(data, ['callback', 'mode'], ['string']);
209 this._ensureTypes(data, ['secret', 'httpRemoteAddr', 'httpFrom', 'requestId'], ['string', 'null', 'undefined']);
210 this._ensureTypes(data, ['leaseSeconds'], ['number']);
211 this._ensureTypes(data, ['isPublisherValidated'], ['boolean']);
212 }
213
214
215 /**
216 * Basic field validation for updating verification data.
217 * @param {Object} verification
218 */
219 _verificationUpdateDataValidate(data) {
220 this._ensureTypes(data, ['verificationId'], ['string', 'number']);
221 this._ensureTypes(data, ['mode'], ['string']);
222 this._ensureTypes(data, ['reason'], ['string', 'null', 'undefined']);
223 this._ensureTypes(data, ['isPublisherValidated'], ['boolean']);
224 }
225
226
227 /**
228 * Basic field validation for upserting subscription data.
229 * @param {Object} subscription
230 */
231 _subscriptionUpsertDataValidate(data) {
232 this._ensureTypes(data, ['topicId'], ['string', 'number']);
233 this._ensureTypes(data, ['callback'], ['string']);
234 this._ensureTypes(data, ['leaseSeconds'], ['number']);
235 this._ensureTypes(data, ['secret', 'httpRemoteAddr', 'httpFrom'], ['string', 'null', 'undefined']);
236 }
237
238
239 _subscriptionUpdateDataValidate(data) {
240 this._ensureTypes(data, ['signatureAlgorithm'], ['string', 'null', 'undefined']);
241 if (!common.validHash(data.signatureAlgorithm)) {
242 throw new DBErrors.DataValidation('unsupported hash algorithm');
243 }
244
245 }
246
247 /* Interface methods */
248
249 /**
250 * Normalize query information to a common form from a specific backend.
251 * @param {*} result
252 * @returns {Object} info
253 * @returns {Number} info.changes
254 * @returns {*} info.lastInsertRowid
255 * @returns {Number} info.duration
256 */
257 _engineInfo(result) {
258 this._notImplemented('engineInfo', arguments);
259 }
260
261
262 /**
263 * Query the current schema version.
264 * This is a standalone query function, as it is called before statements are loaded.
265 * @returns {Object} version
266 * @returns {Number} version.major
267 * @returns {Number} version.minor
268 * @returns {Number} version.patch
269 */
270 async _currentSchema() {
271 this._notImplemented('_currentSchema', arguments);
272 }
273
274
275 /**
276 * Wrap a function call in a database context.
277 * @param {Function} fn fn(ctx)
278 */
279 async context(fn) {
280 this._notImplemented('context', arguments);
281 }
282
283
284 /**
285 * Wrap a function call in a transaction context.
286 * @param {*} dbCtx
287 * @param {Function} fn fn(txCtx)
288 */
289 async transaction(dbCtx, fn) {
290 this._notImplemented('transaction', arguments);
291 }
292
293
294 /**
295 * Store an authentication success event.
296 * @param {*} dbCtx
297 * @param {String} identifier
298 */
299 async authenticationSuccess(dbCtx, identifier) {
300 this._notImplemented('authenticationSuccess', arguments);
301 }
302
303
304 /**
305 * Fetch authentication data for identifier.
306 * @param {*} dbCtx
307 * @param {*} identifier
308 */
309 async authenticationGet(dbCtx, identifier) {
310 this._notImplemented('authenticationGet', arguments);
311 }
312
313
314 /**
315 * Create or update an authentication entity.
316 * @param {*} dbCtx
317 * @param {String} identifier
318 * @param {String} credential
319 */
320 async authenticationUpsert(dbCtx, identifier, credential) {
321 this._notImplemented('authenticationUpsert', arguments);
322 }
323
324
325 /**
326 * All subscriptions to a topic.
327 * @param {*} dbCtx
328 * @param {String} topicId
329 */
330 async subscriptionsByTopicId(dbCtx, topicId) {
331 this._notImplemented('subscriptionsByTopicId', arguments);
332 }
333
334
335 /**
336 * Number of subscriptions to a topic.
337 * @param {*} dbCtx
338 * @param {String} topicUrl
339 */
340 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
341 this._notImplemented('subscriptionCountByTopicUrl', arguments);
342 }
343
344
345 /**
346 * Remove an existing subscription.
347 * @param {*} dbCtx
348 * @param {String} callback
349 * @param {*} topicId
350 */
351 async subscriptionDelete(dbCtx, callback, topicId) {
352 this._notImplemented('subscriptionDelete', arguments);
353 }
354
355
356 /**
357 * Remove any expired subscriptions to a topic.
358 * @param {*} dbCtx
359 * @param {*} topicId
360 */
361 async subscriptionDeleteExpired(dbCtx, topicId) {
362 this._notImplemented('subscriptionDeleteExpired', arguments);
363 }
364
365
366 /**
367 * Claim subscriptions needing content updates attempted.
368 * @param {*} dbCtx
369 * @param {Number} wanted maximum subscription updates to claim
370 * @param {Integer} claimTimeoutSeconds age of claimed updates to reclaim
371 * @param {String} claimant
372 * @returns {Array} list of subscriptions
373 */
374 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
375 this._notImplemented('subscriptionDeliveryClaim', arguments);
376 }
377
378
379 /**
380 * Claim a subscription delivery.
381 * @param {*} dbCtx
382 * @param {*} subscriptionId
383 * @param {*} claimTimeoutSeconds
384 * @param {*} claimant
385 */
386 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
387 this._notImplemented('subscriptionDeliveryClaimById', arguments);
388 }
389
390
391 /**
392 * A subscriber successfully received new topic content, update subscription.
393 * @param {*} dbCtx
394 * @param {String} callback
395 * @param {*} topicId
396 */
397 async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
398 this._notImplemented('subscriptionDeliveryComplete', arguments);
399 }
400
401
402 /**
403 * A subscriber denied new topic content, remove subscription.
404 * @param {*} dbCtx
405 * @param {String} callback
406 * @param {*} topicId
407 */
408 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
409 this._notImplemented('subscriptionDeliveryGone', arguments);
410 }
411
412
413 /**
414 * An attempt to deliver content to a subscriber did not complete, update delivery accordingly.
415 * @param {*} dbCtx
416 * @param {String} callback
417 * @param {*} topicId
418 * @param {Number[]} retryDelays
419 */
420 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays) {
421 this._notImplemented('subscriptionDeliveryIncomplete', arguments);
422 }
423
424
425 /**
426 * Fetch subscription details
427 * @param {*} dbCtx
428 * @param {String} callback
429 * @param {*} topicId
430 */
431 async subscriptionGet(dbCtx, callback, topicId) {
432 this._notImplemented('subscriptionGet', arguments);
433 }
434
435
436 /**
437 * Fetch subscription details
438 * @param {*} dbCtx
439 * @param {*} subscriptionId
440 */
441 async subscriptionGetById(dbCtx, subscriptionId) {
442 this._notImplemented('subscriptionGetById', arguments);
443 }
444
445
446 /**
447 * Set subscription details
448 * @param {*} dbCtx
449 * @param {Object} data
450 * @param {String} data.callback
451 * @param {*} data.topicId
452 * @param {Number} data.leaseSeconds
453 * @param {String=} data.secret
454 * @param {String=} data.httpRemoteAddr
455 * @param {String=} data.httpFrom
456 */
457 async subscriptionUpsert(dbCtx, data) {
458 this._notImplemented('subscriptionUpsert', arguments);
459 }
460
461
462 /**
463 * Set some subscription fields
464 * @param {*} dbCtx
465 * @param {Object} data
466 * @param {*} data.subscriptionId
467 * @param {String} data.signatureAlgorithm
468 */
469 async subscriptionUpdate(dbCtx, data) {
470 this._notImplemented('subscriptionUpdate', arguments);
471 }
472
473
474 /**
475 * Sets the isDeleted flag on a topic, and reset update time.
476 * @param {*} txCtx
477 * @param {*} topicId
478 */
479 async topicDeleted(dbCtx, topicId) {
480 this._notImplemented('topicDeleted', arguments);
481 }
482
483
484 /**
485 * Claim topics to fetch updates for, from available.
486 * @param {*} dbCtx
487 * @param {Integer} wanted maximum topic fetches to claim
488 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
489 * @param {String} claimant node id claiming these fetches
490 */
491 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
492 this._notImplemented('topicFetchClaim', arguments);
493 }
494
495
496 /**
497 * Claim a topic to update.
498 * @param {*} dbCtx
499 * @param {*} topicId
500 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
501 * @param {String} claimant node id claiming these fetches
502 */
503 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
504 this._notImplemented('topicFetchClaim', arguments);
505 }
506
507
508 /**
509 * Reset publish state, and reset deliveries for subscribers.
510 * @param {*} dbCtx
511 * @param {*} topicId
512 */
513 async topicFetchComplete(dbCtx, topicId) {
514 this._notImplemented('topicFetchComplete', arguments);
515 }
516
517
518 /**
519 * Bump count of attempts and release claim on update.
520 * @param {*} dbCtx
521 * @param {*} topicId
522 * @param {Number[]} retryDelays
523 */
524 async topicFetchIncomplete(dbCtx, topicId, retryDelays) {
525 this._notImplemented('topicFetchIncomplete', arguments);
526 }
527
528
529 /**
530 * Set a topic as ready to be checked for an update.
531 * @param {*} dbCtx
532 * @param {*} topicId
533 * @returns {Boolean}
534 */
535 async topicFetchRequested(dbCtx, topicId) {
536 this._notImplemented('topicPublish', arguments);
537 }
538
539
540 /**
541 * Get all data for all topics, including subscription count.
542 * @param {*} dbCtx
543 */
544 async topicGetAll(dbCtx) {
545 this._notImplemented('topicGetAll', arguments);
546 }
547
548
549 /**
550 * Get topic data, without content.
551 * @param {*} dbCtx
552 * @param {String} topicUrl
553 * @param {Boolean} applyDefaults
554 */
555 async topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
556 this._notImplemented('topicGetByUrl', arguments);
557 }
558
559
560 /**
561 * Get topic data, without content.
562 * @param {*} dbCtx
563 * @param {*} topicId
564 * @param {Boolean} applyDefaults
565 */
566 async topicGetById(dbCtx, topicId, applyDefaults = true) {
567 this._notImplemented('topicGetById', arguments);
568 }
569
570
571 /**
572 * Returns topic data with content.
573 * @param {*} dbCx
574 * @param {*} topicId
575 */
576 async topicGetContentById(dbCx, topicId) {
577 this._notImplemented('topicGetContentById', arguments);
578 }
579
580
581 /**
582 * Attempt to delete a topic, which must be set isDeleted, if there
583 * are no more subscriptions belaying its removal.
584 * @param {*} topicId
585 */
586 async topicPendingDelete(dbCtx, topicId) {
587 this._notImplemented('topicPendingDelete', arguments);
588 }
589
590
591 /**
592 * Return an array of the counts of the last #days of topic updates.
593 * @param {*} dbCtx
594 * @param {*} topicId
595 * @param {Number} days
596 * @returns {Number[]}
597 */
598 async topicPublishHistory(dbCtx, topicId, days) {
599 this._notImplemented('topicPublishHistory', arguments);
600 }
601
602
603 /**
604 * Create or update the basic parameters of a topic.
605 * @param {*} dbCtx
606 * @param {TopicData} data
607 */
608 async topicSet(dbCtx, data) {
609 this._notImplemented('topicSet', arguments);
610 }
611
612
613 /**
614 * Updates a topic's content data and content update timestamp.
615 * @param {Object} data
616 * @param {*} data.topicId
617 * @param {String} data.content
618 * @param {String} data.contentHash
619 * @param {String=} data.contentType
620 * @param {String=} data.eTag
621 * @param {String=} data.lastModified
622 */
623 async topicSetContent(dbCtx, data) {
624 this._notImplemented('topicSetContent', arguments);
625 }
626
627
628 /**
629 * Set some topic fields.
630 * @param {*} dbCtx
631 * @param {Object} data
632 * @param {*} data.topicId
633 * @param {Number=} data.leaseSecondsPreferred
634 * @param {Number=} data.leaseSecondsMin
635 * @param {Number=} data.leaseSecondsMax
636 * @param {String=} data.publisherValidationUrl
637 * @param {String=} data.contentHashAlgorithm
638 */
639 async topicUpdate(dbCtx, data) {
640 this._notImplemented('topicUpdate', arguments);
641 }
642
643
644 /**
645 * Claim pending verifications for attempted resolution.
646 * @param {*} dbCtx
647 * @param {Integer} wanted maximum verifications to claim
648 * @param {Integer} claimTimeoutSeconds age of claimed verifications to reclaim
649 * @returns {Verification[]} array of claimed verifications
650 */
651 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
652 this._notImplemented('verificationClaim', arguments);
653 }
654
655
656 /**
657 * Claim a specific verification by id, if no other similar verification claimed.
658 * @param {*} dbCtx
659 * @param {*} verificationId
660 * @param {Number} claimTimeoutSeconds
661 * @param {String} claimant
662 */
663 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
664 this._notImplemented('verificationClaimById', arguments);
665 }
666
667
668 /**
669 * Remove the verification, any older
670 * verifications for that same client/topic, and the claim.
671 * @param {*} dbCtx
672 * @param {*} verificationId
673 * @param {String} callback
674 * @param {*} topicId
675 */
676 async verificationComplete(dbCtx, verificationId, callback, topicId) {
677 this._notImplemented('verificationComplete', arguments);
678 }
679
680
681 /**
682 * Get verification data.
683 * @param {*} dbCtx
684 * @param {*} verificationId
685 */
686 async verificationGetById(dbCtx, verificationId) {
687 this._notImplemented('verificationGetById', arguments);
688 }
689
690
691 /**
692 * Update database that a client verification was unable to complete.
693 * This releases the delivery claim and reschedules for some future time.
694 * @param {*} dbCtx
695 * @param {String} callback client callback url
696 * @param {*} topicId internal topic id
697 * @param {Number[]} retryDelays
698 */
699 async verificationIncomplete(dbCtx, verificationId, retryDelays) {
700 this._notImplemented('verificationIncomplete', arguments);
701 }
702
703
704 /**
705 * Create a new pending verification.
706 * @param {*} dbCtx
707 * @param {VerificationData} data
708 * @param {Boolean} claim
709 * @returns {*} verificationId
710 */
711 async verificationInsert(dbCtx, verification) {
712 this._notImplemented('verificationInsert', arguments);
713 }
714
715
716 /**
717 * Relinquish the claim on a verification, without any other updates.
718 * @param {*} dbCtx
719 * @param {String} callback client callback url
720 * @param {*} topicId internal topic id
721 */
722 async verificationRelease(dbCtx, verificationId) {
723 this._notImplemented('verificationRelease', arguments);
724 }
725
726
727 /**
728 * Updates some fields of an existing (presumably claimed) verification.
729 * @param {*} dbCtx
730 * @param {*} verificationId
731 * @param {Object} data
732 * @param {String} data.mode
733 * @param {String} data.reason
734 * @param {Boolean} data.isPublisherValidated
735 */
736 async verificationUpdate(dbCtx, verificationId, data) {
737 this._notImplemented('verificationUpdate', arguments);
738 }
739
740
741 /**
742 * Sets the isPublisherValidated flag on a verification and resets the delivery
743 * @param {*} dbCtx
744 * @param {*} verificationId
745 */
746 async verificationValidated(dbCtx, verificationId) {
747 this._notImplemented('verificationValidated', arguments);
748 }
749
750 }
751
752 module.exports = Database;