database migration 1.0.4, store topic fetch etag/last-modified, provide these when...
[websub-hub] / src / db / base.js
1 /* eslint-disable no-unused-vars */
2 'use strict';
3
4 /**
5 * This is the semi-abstract database class, providing interface and utility methods.
6 */
7
8 const common = require('../common');
9 const DBErrors = require('./errors');
10 const svh = require('./schema-version-helper');
11
12 const _fileScope = common.fileScope(__filename);
13
14 class Database {
15 constructor(logger = common.nullLogger, options = {}) {
16 this.logger = logger;
17 common.ensureLoggerLevels(this.logger);
18
19 // Store the merged config and default values for lease values.
20 // N.B. breaking hierarchy of config options here
21 this.topicLeaseDefaults = {};
22 common.setOptions(this.topicLeaseDefaults, common.topicLeaseDefaults(), options.topicLeaseDefaults || {});
23 }
24
25
26 /**
27 * Turn a snake into a camel.
28 * Used when translating SQL column names to JS object style.
29 * @param {String} snakeCase
30 * @param {String|RegExp} delimiter
31 * @returns {String}
32 */
33 static _camelfy(snakeCase, delimiter = '_') {
34 if (!snakeCase || typeof snakeCase.split !== 'function') {
35 return undefined;
36 }
37 const words = snakeCase.split(delimiter);
38 return [
39 words.shift(),
40 ...words.map((word) => word.charAt(0).toUpperCase() + word.slice(1)),
41 ].join('');
42 }
43
44
45 /**
46 * Basic type checking of object properties.
47 * @param {Object} object
48 * @param {String[]} properties
49 * @param {String[]} types
50 */
51 _ensureTypes(object, properties, types) {
52 const _scope = _fileScope('_ensureTypes');
53
54 if (!(object && properties && types)) {
55 this.logger.error(_scope, 'undefined argument', { object, properties, types });
56 throw new DBErrors.DataValidation();
57 }
58 properties.forEach((p) => {
59 // eslint-disable-next-line security/detect-object-injection
60 const pObj = object[p];
61 const pType = typeof pObj;
62 if (!types.includes(pType)
63 && !(pObj instanceof Buffer && types.includes('buffer'))
64 && !(pObj === null && types.includes('null'))
65 && !(pType === 'bigint' && types.includes('number'))) {
66 const reason = `'${p}' is '${pType}', but must be ${types.length > 1 ? 'one of ' : ''}'${types}'`;
67 this.logger.error(_scope, reason, {});
68 throw new DBErrors.DataValidation(reason);
69 }
70 });
71 }
72
73
74 /**
75 * Interface methods need implementations.
76 * @param {String} method
77 * @param {arguments} args
78 */
79 _notImplemented(method, args) {
80 this.logger.error(_fileScope(method), 'abstract method called', Array.from(args));
81 throw new DBErrors.NotImplemented(method);
82 }
83
84
85 /**
86 * Perform tasks needed to prepare database for use. Ensure this is called
87 * after construction, and before any other database activity.
88 * At the minimum, this will validate a compatible schema is present and usable.
89 * Some engines will also perform other initializations or async actions which
90 * are easier handled outside the constructor.
91 */
92 async initialize() {
93 const _scope = _fileScope('initialize');
94
95 const currentSchema = await this._currentSchema();
96 const current = svh.schemaVersionObjectToNumber(currentSchema);
97 const min = svh.schemaVersionObjectToNumber(this.schemaVersionsSupported.min);
98 const max = svh.schemaVersionObjectToNumber(this.schemaVersionsSupported.max);
99 if (current >= min && current <= max) {
100 this.logger.debug(_scope, 'schema supported', { currentSchema, schemaVersionsSupported: this.schemaVersionsSupported });
101 } else {
102 this.logger.error(_scope, 'schema not supported', { currentSchema, schemaVersionsSupported: this.schemaVersionsSupported });
103 throw new DBErrors.MigrationNeeded();
104 }
105 }
106
107
108 /**
109 * Perform db connection healthcheck.
110 */
111 async healthCheck() {
112 this._notImplemented('healthCheck', arguments);
113 }
114
115
116 /**
117 * Replace any NULL from topic DB entry with default values.
118 * @param {Object} topic
119 * @returns {Object}
120 */
121 _topicDefaults(topic) {
122 if (topic) {
123 for (const [key, value] of Object.entries(this.topicLeaseDefaults)) {
124 // eslint-disable-next-line security/detect-object-injection
125 if (!(key in topic) || topic[key] === null) {
126 // eslint-disable-next-line security/detect-object-injection
127 topic[key] = value;
128 }
129 }
130 }
131 return topic;
132 }
133
134
135 /**
136 * Ensures any lease durations in data are consistent.
137 * @param {Object} data
138 */
139 _leaseDurationsValidate(data) {
140 const leaseProperties = Object.keys(this.topicLeaseDefaults)
141 this._ensureTypes(data, leaseProperties, ['number', 'undefined', 'null']);
142
143 // Populate defaults on a copy of values so we can check proper numerical ordering
144 const leaseValues = common.pick(data, leaseProperties);
145 this._topicDefaults(leaseValues);
146 for (const [prop, value] of Object.entries(leaseValues)) {
147 if (value <= 0) {
148 throw new DBErrors.DataValidation(`${prop} must be positive`);
149 }
150 }
151 if (!(leaseValues.leaseSecondsMin <= leaseValues.leaseSecondsPreferred && leaseValues.leaseSecondsPreferred <= leaseValues.leaseSecondsMax)) {
152 throw new DBErrors.DataValidation('lease durations violate numerical ordering');
153 }
154 }
155
156
157 /**
158 * Basic field validation for setting topic data.
159 * @param {Object} data
160 */
161 _topicSetDataValidate(data) {
162 this._ensureTypes(data, ['url'], ['string']);
163 this._ensureTypes(data, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
164 this._leaseDurationsValidate(data);
165 }
166
167
168 /**
169 * Basic field validation for setting topic content.
170 * @param {Object} data
171 */
172 _topicSetContentDataValidate(data) {
173 this._ensureTypes(data, ['content'], ['string', 'buffer']);
174 this._ensureTypes(data, ['contentHash'], ['string']);
175 this._ensureTypes(data, ['contentType'], ['string', 'null', 'undefined']);
176 this._ensureTypes(data, ['eTag'], ['string', 'null', 'undefined']);
177 this._ensureTypes(data, ['lastModified'], ['string', 'null', 'undefined']);
178 }
179
180
181 /**
182 * Basic field validation for updating topic.
183 * @param {Object} data
184 */
185 _topicUpdateDataValidate(data) {
186 this._ensureTypes(data, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
187 if (data.publisherValidationUrl) {
188 try {
189 new URL(data.publisherValidationUrl);
190 } catch (e) {
191 throw new DBErrors.DataValidation('invalid URL format');
192 }
193 }
194 this._ensureTypes(data, ['contentHashAlgorithm'], ['string']);
195 if (!common.validHash(data.contentHashAlgorithm)) {
196 throw new DBErrors.DataValidation('unsupported hash algorithm');
197 }
198 this._leaseDurationsValidate(data);
199 }
200
201
202 /**
203 * Basic field validation for setting verification data.
204 * @param {Object} data
205 */
206 _verificationDataValidate(data) {
207 this._ensureTypes(data, ['topicId'], ['string', 'number']);
208 this._ensureTypes(data, ['callback', 'mode'], ['string']);
209 this._ensureTypes(data, ['secret', 'httpRemoteAddr', 'httpFrom', 'requestId'], ['string', 'null', 'undefined']);
210 this._ensureTypes(data, ['leaseSeconds'], ['number']);
211 this._ensureTypes(data, ['isPublisherValidated'], ['boolean']);
212 }
213
214
215 /**
216 * Basic field validation for updating verification data.
217 * @param {Object} verification
218 */
219 _verificationUpdateDataValidate(data) {
220 this._ensureTypes(data, ['verificationId'], ['string', 'number']);
221 this._ensureTypes(data, ['mode'], ['string']);
222 this._ensureTypes(data, ['reason'], ['string', 'null', 'undefined']);
223 this._ensureTypes(data, ['isPublisherValidated'], ['boolean']);
224 }
225
226
227 /**
228 * Basic field validation for upserting subscription data.
229 * @param {Object} subscription
230 */
231 _subscriptionUpsertDataValidate(data) {
232 this._ensureTypes(data, ['topicId'], ['string', 'number']);
233 this._ensureTypes(data, ['callback'], ['string']);
234 this._ensureTypes(data, ['leaseSeconds'], ['number']);
235 this._ensureTypes(data, ['secret', 'httpRemoteAddr', 'httpFrom'], ['string', 'null', 'undefined']);
236 }
237
238
239 _subscriptionUpdateDataValidate(data) {
240 this._ensureTypes(data, ['signatureAlgorithm'], ['string', 'null', 'undefined']);
241 if (!common.validHash(data.signatureAlgorithm)) {
242 throw new DBErrors.DataValidation('unsupported hash algorithm');
243 }
244
245 }
246
247 /* Interface methods */
248
249 /**
250 * Normalize query information to a common form from a specific backend.
251 * @param {*} result
252 * @returns {Object} info
253 * @returns {Number} info.changes
254 * @returns {*} info.lastInsertRowid
255 * @returns {Number} info.duration
256 */
257 _engineInfo(result) {
258 this._notImplemented('engineInfo', arguments);
259 }
260
261
262 /**
263 * Query the current schema version.
264 * This is a standalone query function, as it is called before statements are loaded.
265 * @returns {Object} version
266 * @returns {Number} version.major
267 * @returns {Number} version.minor
268 * @returns {Number} version.patch
269 */
270 async _currentSchema() {
271 this._notImplemented('_currentSchema', arguments);
272 }
273
274
275 /**
276 * Wrap a function call in a database context.
277 * @param {Function} fn fn(ctx)
278 */
279 async context(fn) {
280 this._notImplemented('context', arguments);
281 }
282
283
284 /**
285 * Wrap a function call in a transaction context.
286 * @param {*} dbCtx
287 * @param {Function} fn fn(txCtx)
288 */
289 async transaction(dbCtx, fn) {
290 this._notImplemented('transaction', arguments);
291 }
292
293
294 /**
295 * Store an authentication success event.
296 * @param {*} dbCtx
297 * @param {String} identifier
298 */
299 async authenticationSuccess(dbCtx, identifier) {
300 this._notImplemented('authenticationSuccess', arguments);
301 }
302
303
304 /**
305 * Fetch authentication data for identifier.
306 * @param {*} dbCtx
307 * @param {*} identifier
308 */
309 async authenticationGet(dbCtx, identifier) {
310 this._notImplemented('authenticationGet', arguments);
311 }
312
313
314 /**
315 * Create or update an authentication entity.
316 * @param {*} dbCtx
317 * @param {String} identifier
318 * @param {String} credential
319 */
320 async authenticationUpsert(dbCtx, identifier, credential) {
321 this._notImplemented('authenticationUpsert', arguments);
322 }
323
324
325 /**
326 * All subscriptions to a topic.
327 * @param {*} dbCtx
328 * @param {String} topicId
329 */
330 async subscriptionsByTopicId(dbCtx, topicId) {
331 this._notImplemented('subscriptionsByTopicId', arguments);
332 }
333
334
335 /**
336 * Number of subscriptions to a topic.
337 * @param {*} dbCtx
338 * @param {String} topicUrl
339 */
340 async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
341 this._notImplemented('subscriptionCountByTopicUrl', arguments);
342 }
343
344
345 /**
346 * Remove an existing subscription.
347 * @param {*} dbCtx
348 * @param {String} callback
349 * @param {*} topicId
350 */
351 async subscriptionDelete(dbCtx, callback, topicId) {
352 this._notImplemented('subscriptionDelete', arguments);
353 }
354
355
356 /**
357 * Remove any expired subscriptions to a topic.
358 * @param {*} dbCtx
359 * @param {*} topicId
360 */
361 async subscriptionDeleteExpired(dbCtx, topicId) {
362 this._notImplemented('subscriptionDeleteExpired', arguments);
363 }
364
365
366 /**
367 * Claim subscriptions needing content updates attempted.
368 * @param {*} dbCtx
369 * @param {Number} wanted maximum subscription updates to claim
370 * @param {Integer} claimTimeoutSeconds age of claimed updates to reclaim
371 * @param {String} claimant
372 * @returns {Array} list of subscriptions
373 */
374 async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
375 this._notImplemented('subscriptionDeliveryClaim', arguments);
376 }
377
378
379 /**
380 * Claim a subscription delivery.
381 * @param {*} dbCtx
382 * @param {*} subscriptionId
383 * @param {*} claimTimeoutSeconds
384 * @param {*} claimant
385 */
386 async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
387 this._notImplemented('subscriptionDeliveryClaimById', arguments);
388 }
389
390
391 /**
392 * A subscriber successfully received new topic content, update subscription.
393 * @param {*} dbCtx
394 * @param {String} callback
395 * @param {*} topicId
396 */
397 async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
398 this._notImplemented('subscriptionDeliveryComplete', arguments);
399 }
400
401
402 /**
403 * A subscriber denied new topic content, remove subscription.
404 * @param {*} dbCtx
405 * @param {String} callback
406 * @param {*} topicId
407 */
408 async subscriptionDeliveryGone(dbCtx, callback, topicId) {
409 this._notImplemented('subscriptionDeliveryGone', arguments);
410 }
411
412
413 /**
414 * An attempt to deliver content to a subscriber did not complete, update delivery accordingly.
415 * @param {*} dbCtx
416 * @param {String} callback
417 * @param {*} topicId
418 * @param {Number[]} retryDelays
419 */
420 async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays) {
421 this._notImplemented('subscriptionDeliveryIncomplete', arguments);
422 }
423
424
425 /**
426 * Fetch subscription details
427 * @param {*} dbCtx
428 * @param {String} callback
429 * @param {*} topicId
430 */
431 async subscriptionGet(dbCtx, callback, topicId) {
432 this._notImplemented('subscriptionGet', arguments);
433 }
434
435
436 /**
437 * Fetch subscription details
438 * @param {*} dbCtx
439 * @param {*} subscriptionId
440 */
441 async subscriptionGetById(dbCtx, subscriptionId) {
442 this._notImplemented('subscriptionGetById', arguments);
443 }
444
445
446 /**
447 * Set subscription details
448 * @param {*} dbCtx
449 * @param {Object} data
450 * @param {String} data.callback
451 * @param {*} data.topicId
452 * @param {Number} data.leaseSeconds
453 * @param {String=} data.secret
454 * @param {String=} data.httpRemoteAddr
455 * @param {String=} data.httpFrom
456 */
457 async subscriptionUpsert(dbCtx, data) {
458 this._notImplemented('subscriptionUpsert', arguments);
459 }
460
461
462 /**
463 * Set some subscription fields
464 * @param {*} dbCtx
465 * @param {Object} data
466 * @param {*} data.subscriptionId
467 * @param {String} data.signatureAlgorithm
468 */
469 async subscriptionUpdate(dbCtx, data) {
470 this._notImplemented('subscriptionUpdate', arguments);
471 }
472
473
474 /**
475 * Sets the isDeleted flag on a topic, and reset update time.
476 * @param {*} txCtx
477 * @param {*} topicId
478 */
479 async topicDeleted(dbCtx, topicId) {
480 this._notImplemented('topicDeleted', arguments);
481 }
482
483
484 /**
485 * Claim topics to fetch updates for, from available.
486 * @param {*} dbCtx
487 * @param {Integer} wanted maximum topic fetches to claim
488 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
489 * @param {String} claimant node id claiming these fetches
490 */
491 async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
492 this._notImplemented('topicFetchClaim', arguments);
493 }
494
495
496 /**
497 * Claim a topic to update.
498 * @param {*} dbCtx
499 * @param {*} topicId
500 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
501 * @param {String} claimant node id claiming these fetches
502 */
503 async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
504 this._notImplemented('topicFetchClaim', arguments);
505 }
506
507
508 /**
509 * Reset publish state, and reset deliveries for subscribers.
510 * @param {*} dbCtx
511 * @param {*} topicId
512 */
513 async topicFetchComplete(dbCtx, topicId) {
514 this._notImplemented('topicFetchComplete', arguments);
515 }
516
517
518 /**
519 * Bump count of attempts and release claim on update.
520 * @param {*} dbCtx
521 * @param {*} topicId
522 * @param {Number[]} retryDelays
523 */
524 async topicFetchIncomplete(dbCtx, topicId, retryDelays) {
525 this._notImplemented('topicFetchIncomplete', arguments);
526 }
527
528
529 /**
530 * Set a topic as ready to be checked for an update.
531 * @param {*} dbCtx
532 * @param {*} topicId
533 * @returns {Boolean}
534 */
535 async topicFetchRequested(dbCtx, topicId) {
536 this._notImplemented('topicPublish', arguments);
537 }
538
539
540 /**
541 * Get all data for all topics, including subscription count.
542 * @param {*} dbCtx
543 */
544 async topicGetAll(dbCtx) {
545 this._notImplemented('topicGetAll', arguments);
546 }
547
548
549 /**
550 * Get topic data, without content.
551 * @param {*} dbCtx
552 * @param {String} topicUrl
553 */
554 async topicGetByUrl(dbCtx, topicUrl) {
555 this._notImplemented('topicGetByUrl', arguments);
556 }
557
558
559 /**
560 * Get topic data, without content.
561 * @param {*} dbCtx
562 * @param {*} topicId
563 * @param {Boolean} applyDefaults
564 */
565 async topicGetById(dbCtx, topicId, applyDefaults = true) {
566 this._notImplemented('topicGetById', arguments);
567 }
568
569
570 /**
571 * Returns topic data with content.
572 * @param {*} dbCx
573 * @param {*} topicId
574 */
575 async topicGetContentById(dbCx, topicId) {
576 this._notImplemented('topicGetContentById', arguments);
577 }
578
579
580 /**
581 * Attempt to delete a topic, which must be set isDeleted, if there
582 * are no more subscriptions belaying its removal.
583 * @param {*} topicId
584 */
585 async topicPendingDelete(dbCtx, topicId) {
586 this._notImplemented('topicPendingDelete', arguments);
587 }
588
589
590 /**
591 * Return an array of the counts of the last #days of topic updates.
592 * @param {*} dbCtx
593 * @param {*} topicId
594 * @param {Number} days
595 * @returns {Number[]}
596 */
597 async topicPublishHistory(dbCtx, topicId, days) {
598 this._notImplemented('topicPublishHistory', arguments);
599 }
600
601
602 /**
603 * Create or update the basic parameters of a topic.
604 * @param {*} dbCtx
605 * @param {TopicData} data
606 */
607 async topicSet(dbCtx, data) {
608 this._notImplemented('topicSet', arguments);
609 }
610
611
612 /**
613 * Updates a topic's content data and content update timestamp.
614 * @param {Object} data
615 * @param {*} data.topicId
616 * @param {String} data.content
617 * @param {String} data.contentHash
618 * @param {String=} data.contentType
619 * @param {String=} data.eTag
620 * @param {String=} data.lastModified
621 */
622 async topicSetContent(dbCtx, data) {
623 this._notImplemented('topicSetContent', arguments);
624 }
625
626
627 /**
628 * Set some topic fields.
629 * @param {*} dbCtx
630 * @param {Object} data
631 * @param {*} data.topicId
632 * @param {Number=} data.leaseSecondsPreferred
633 * @param {Number=} data.leaseSecondsMin
634 * @param {Number=} data.leaseSecondsMax
635 * @param {String=} data.publisherValidationUrl
636 * @param {String=} data.contentHashAlgorithm
637 */
638 async topicUpdate(dbCtx, data) {
639 this._notImplemented('topicUpdate', arguments);
640 }
641
642
643 /**
644 * Claim pending verifications for attempted resolution.
645 * @param {*} dbCtx
646 * @param {Integer} wanted maximum verifications to claim
647 * @param {Integer} claimTimeoutSeconds age of claimed verifications to reclaim
648 * @returns {Verification[]} array of claimed verifications
649 */
650 async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
651 this._notImplemented('verificationClaim', arguments);
652 }
653
654
655 /**
656 * Claim a specific verification by id, if no other similar verification claimed.
657 * @param {*} dbCtx
658 * @param {*} verificationId
659 * @param {Number} claimTimeoutSeconds
660 * @param {String} claimant
661 */
662 async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
663 this._notImplemented('verificationClaimById', arguments);
664 }
665
666
667 /**
668 * Remove the verification, any older
669 * verifications for that same client/topic, and the claim.
670 * @param {*} dbCtx
671 * @param {*} verificationId
672 * @param {String} callback
673 * @param {*} topicId
674 */
675 async verificationComplete(dbCtx, verificationId, callback, topicId) {
676 this._notImplemented('verificationComplete', arguments);
677 }
678
679
680 /**
681 * Get verification data.
682 * @param {*} dbCtx
683 * @param {*} verificationId
684 */
685 async verificationGetById(dbCtx, verificationId) {
686 this._notImplemented('verificationGetById', arguments);
687 }
688
689
690 /**
691 * Update database that a client verification was unable to complete.
692 * This releases the delivery claim and reschedules for some future time.
693 * @param {*} dbCtx
694 * @param {String} callback client callback url
695 * @param {*} topicId internal topic id
696 * @param {Number[]} retryDelays
697 */
698 async verificationIncomplete(dbCtx, verificationId, retryDelays) {
699 this._notImplemented('verificationIncomplete', arguments);
700 }
701
702
703 /**
704 * Create a new pending verification.
705 * @param {*} dbCtx
706 * @param {VerificationData} data
707 * @param {Boolean} claim
708 * @returns {*} verificationId
709 */
710 async verificationInsert(dbCtx, verification) {
711 this._notImplemented('verificationInsert', arguments);
712 }
713
714
715 /**
716 * Relinquish the claim on a verification, without any other updates.
717 * @param {*} dbCtx
718 * @param {String} callback client callback url
719 * @param {*} topicId internal topic id
720 */
721 async verificationRelease(dbCtx, verificationId) {
722 this._notImplemented('verificationRelease', arguments);
723 }
724
725
726 /**
727 * Updates some fields of an existing (presumably claimed) verification.
728 * @param {*} dbCtx
729 * @param {*} verificationId
730 * @param {Object} data
731 * @param {String} data.mode
732 * @param {String} data.reason
733 * @param {Boolean} data.isPublisherValidated
734 */
735 async verificationUpdate(dbCtx, verificationId, data) {
736 this._notImplemented('verificationUpdate', arguments);
737 }
738
739
740 /**
741 * Sets the isPublisherValidated flag on a verification and resets the delivery
742 * @param {*} dbCtx
743 * @param {*} verificationId
744 */
745 async verificationValidated(dbCtx, verificationId) {
746 this._notImplemented('verificationValidated', arguments);
747 }
748
749 }
750
751 module.exports = Database;