1 /* eslint-disable no-unused-vars */
5 * This is the semi-abstract database class, providing interface and utility methods.
8 const common
= require('../common');
9 const DBErrors
= require('./errors');
10 const svh
= require('./schema-version-helper');
12 const _fileScope
= common
.fileScope(__filename
);
15 constructor(logger
= common
.nullLogger
, options
= {}) {
17 common
.ensureLoggerLevels(this.logger
);
19 // Store the merged config and default values for lease values.
20 // N.B. breaking hierarchy of config options here
21 this.topicLeaseDefaults
= {};
22 common
.setOptions(this.topicLeaseDefaults
, common
.topicLeaseDefaults(), options
.topicLeaseDefaults
|| {});
27 * Turn a snake into a camel.
28 * Used when translating SQL column names to JS object style.
29 * @param {String} snakeCase
30 * @param {String|RegExp} delimiter
33 static _camelfy(snakeCase
, delimiter
= '_') {
34 if (!snakeCase
|| typeof snakeCase
.split
!== 'function') {
37 const words
= snakeCase
.split(delimiter
);
40 ...words
.map((word
) => word
.charAt(0).toUpperCase() + word
.slice(1)),
46 * Basic type checking of object properties.
47 * @param {Object} object
48 * @param {String[]} properties
49 * @param {String[]} types
51 _ensureTypes(object
, properties
, types
) {
52 const _scope
= _fileScope('_ensureTypes');
54 if (!(object
&& properties
&& types
)) {
55 this.logger
.error(_scope
, 'undefined argument', { object
, properties
, types
});
56 throw new DBErrors
.DataValidation();
58 properties
.forEach((p
) => {
59 // eslint-disable-next-line security/detect-object-injection
60 const pObj
= object
[p
];
61 const pType
= typeof pObj
;
62 if (!types
.includes(pType
)
63 && !(pObj
instanceof Buffer
&& types
.includes('buffer'))
64 && !(pObj
=== null && types
.includes('null'))
65 && !(pType
=== 'bigint' && types
.includes('number'))) {
66 const reason
= `'${p}' is '${pType}', but must be ${types.length > 1 ? 'one of ' : ''}'${types}'`;
67 this.logger
.error(_scope
, reason
, {});
68 throw new DBErrors
.DataValidation(reason
);
75 * Interface methods need implementations.
76 * @param {String} method
77 * @param {arguments} args
79 _notImplemented(method
, args
) {
80 this.logger
.error(_fileScope(method
), 'abstract method called', Array
.from(args
));
81 throw new DBErrors
.NotImplemented(method
);
86 * Perform tasks needed to prepare database for use. Ensure this is called
87 * after construction, and before any other database activity.
88 * At the minimum, this will validate a compatible schema is present and usable.
89 * Some engines will also perform other initializations or async actions which
90 * are easier handled outside the constructor.
93 const _scope
= _fileScope('initialize');
95 const currentSchema
= await
this._currentSchema();
96 const current
= svh
.schemaVersionObjectToNumber(currentSchema
);
97 const min
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.min
);
98 const max
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.max
);
99 if (current
>= min
&& current
<= max
) {
100 this.logger
.debug(_scope
, 'schema supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
102 this.logger
.error(_scope
, 'schema not supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
103 throw new DBErrors
.MigrationNeeded();
109 * Perform db connection healthcheck.
111 async
healthCheck() {
112 this._notImplemented('healthCheck', arguments
);
117 * Replace any NULL from topic DB entry with default values.
118 * @param {Object} topic
121 _topicDefaults(topic
) {
123 for (const [key
, value
] of Object
.entries(this.topicLeaseDefaults
)) {
124 // eslint-disable-next-line security/detect-object-injection
125 if (!(key
in topic
) || topic
[key
] === null) {
126 // eslint-disable-next-line security/detect-object-injection
136 * Ensures any lease durations in data are consistent.
137 * @param {Object} data
139 _leaseDurationsValidate(data
) {
140 const leaseProperties
= Object
.keys(this.topicLeaseDefaults
);
141 this._ensureTypes(data
, leaseProperties
, ['number', 'undefined', 'null']);
143 // Populate defaults on a copy of values so we can check proper numerical ordering
144 const leaseValues
= common
.pick(data
, leaseProperties
);
145 this._topicDefaults(leaseValues
);
146 for (const [prop
, value
] of Object
.entries(leaseValues
)) {
148 throw new DBErrors
.DataValidation(`${prop} must be positive`);
151 if (!(leaseValues
.leaseSecondsMin
<= leaseValues
.leaseSecondsPreferred
&& leaseValues
.leaseSecondsPreferred
<= leaseValues
.leaseSecondsMax
)) {
152 throw new DBErrors
.DataValidation('lease durations violate numerical ordering');
158 * Basic field validation for setting topic data.
159 * @param {Object} data
161 _topicSetDataValidate(data
) {
162 this._ensureTypes(data
, ['url'], ['string']);
163 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
164 this._leaseDurationsValidate(data
);
169 * Basic field validation for setting topic content.
170 * @param {Object} data
172 _topicSetContentDataValidate(data
) {
173 this._ensureTypes(data
, ['content'], ['string', 'buffer']);
174 this._ensureTypes(data
, ['contentHash'], ['string']);
175 this._ensureTypes(data
, ['contentType'], ['string', 'null', 'undefined']);
176 this._ensureTypes(data
, ['eTag'], ['string', 'null', 'undefined']);
177 this._ensureTypes(data
, ['lastModified'], ['string', 'null', 'undefined']);
182 * Basic field validation for updating topic.
183 * @param {Object} data
185 _topicUpdateDataValidate(data
) {
186 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
187 if (data
.publisherValidationUrl
) {
189 new URL(data
.publisherValidationUrl
);
191 throw new DBErrors
.DataValidation('invalid URL format');
194 this._ensureTypes(data
, ['contentHashAlgorithm'], ['string']);
195 if (!common
.validHash(data
.contentHashAlgorithm
)) {
196 throw new DBErrors
.DataValidation('unsupported hash algorithm');
198 this._leaseDurationsValidate(data
);
203 * Basic field validation for setting verification data.
204 * @param {Object} data
206 _verificationDataValidate(data
) {
207 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
208 this._ensureTypes(data
, ['callback', 'mode'], ['string']);
209 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom', 'requestId'], ['string', 'null', 'undefined']);
210 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
211 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
216 * Basic field validation for updating verification data.
217 * @param {Object} verification
219 _verificationUpdateDataValidate(data
) {
220 this._ensureTypes(data
, ['verificationId'], ['string', 'number']);
221 this._ensureTypes(data
, ['mode'], ['string']);
222 this._ensureTypes(data
, ['reason'], ['string', 'null', 'undefined']);
223 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
228 * Basic field validation for upserting subscription data.
229 * @param {Object} subscription
231 _subscriptionUpsertDataValidate(data
) {
232 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
233 this._ensureTypes(data
, ['callback'], ['string']);
234 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
235 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom'], ['string', 'null', 'undefined']);
239 _subscriptionUpdateDataValidate(data
) {
240 this._ensureTypes(data
, ['signatureAlgorithm'], ['string', 'null', 'undefined']);
241 if (!common
.validHash(data
.signatureAlgorithm
)) {
242 throw new DBErrors
.DataValidation('unsupported hash algorithm');
247 /* Interface methods */
250 * Normalize query information to a common form from a specific backend.
252 * @returns {Object} info
253 * @returns {Number} info.changes
254 * @returns {*} info.lastInsertRowid
255 * @returns {Number} info.duration
257 _engineInfo(result
) {
258 this._notImplemented('engineInfo', arguments
);
263 * Query the current schema version.
264 * This is a standalone query function, as it is called before statements are loaded.
265 * @returns {Object} version
266 * @returns {Number} version.major
267 * @returns {Number} version.minor
268 * @returns {Number} version.patch
270 async
_currentSchema() {
271 this._notImplemented('_currentSchema', arguments
);
276 * Wrap a function call in a database context.
277 * @param {Function} fn fn(ctx)
280 this._notImplemented('context', arguments
);
285 * Wrap a function call in a transaction context.
287 * @param {Function} fn fn(txCtx)
289 async
transaction(dbCtx
, fn
) {
290 this._notImplemented('transaction', arguments
);
295 * Store an authentication success event.
297 * @param {String} identifier
299 async
authenticationSuccess(dbCtx
, identifier
) {
300 this._notImplemented('authenticationSuccess', arguments
);
305 * Fetch authentication data for identifier.
307 * @param {*} identifier
309 async
authenticationGet(dbCtx
, identifier
) {
310 this._notImplemented('authenticationGet', arguments
);
315 * Create or update an authentication entity.
317 * @param {String} identifier
318 * @param {String} credential
320 async
authenticationUpsert(dbCtx
, identifier
, credential
) {
321 this._notImplemented('authenticationUpsert', arguments
);
326 * All subscriptions to a topic.
328 * @param {String} topicId
330 async
subscriptionsByTopicId(dbCtx
, topicId
) {
331 this._notImplemented('subscriptionsByTopicId', arguments
);
336 * Number of subscriptions to a topic.
338 * @param {String} topicUrl
340 async
subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
341 this._notImplemented('subscriptionCountByTopicUrl', arguments
);
346 * Remove an existing subscription.
348 * @param {String} callback
351 async
subscriptionDelete(dbCtx
, callback
, topicId
) {
352 this._notImplemented('subscriptionDelete', arguments
);
357 * Remove any expired subscriptions to a topic.
361 async
subscriptionDeleteExpired(dbCtx
, topicId
) {
362 this._notImplemented('subscriptionDeleteExpired', arguments
);
367 * Claim subscriptions needing content updates attempted.
369 * @param {Number} wanted maximum subscription updates to claim
370 * @param {Integer} claimTimeoutSeconds age of claimed updates to reclaim
371 * @param {String} claimant
372 * @returns {Array} list of subscriptions
374 async
subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
375 this._notImplemented('subscriptionDeliveryClaim', arguments
);
380 * Claim a subscription delivery.
382 * @param {*} subscriptionId
383 * @param {*} claimTimeoutSeconds
384 * @param {*} claimant
386 async
subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
387 this._notImplemented('subscriptionDeliveryClaimById', arguments
);
392 * A subscriber successfully received new topic content, update subscription.
394 * @param {String} callback
397 async
subscriptionDeliveryComplete(dbCtx
, callback
, topicId
) {
398 this._notImplemented('subscriptionDeliveryComplete', arguments
);
403 * A subscriber denied new topic content, remove subscription.
405 * @param {String} callback
408 async
subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
409 this._notImplemented('subscriptionDeliveryGone', arguments
);
414 * An attempt to deliver content to a subscriber did not complete, update delivery accordingly.
416 * @param {String} callback
418 * @param {Number[]} retryDelays
420 async
subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
) {
421 this._notImplemented('subscriptionDeliveryIncomplete', arguments
);
426 * Fetch subscription details
428 * @param {String} callback
431 async
subscriptionGet(dbCtx
, callback
, topicId
) {
432 this._notImplemented('subscriptionGet', arguments
);
437 * Fetch subscription details
439 * @param {*} subscriptionId
441 async
subscriptionGetById(dbCtx
, subscriptionId
) {
442 this._notImplemented('subscriptionGetById', arguments
);
447 * Set subscription details
449 * @param {Object} data
450 * @param {String} data.callback
451 * @param {*} data.topicId
452 * @param {Number} data.leaseSeconds
453 * @param {String=} data.secret
454 * @param {String=} data.httpRemoteAddr
455 * @param {String=} data.httpFrom
457 async
subscriptionUpsert(dbCtx
, data
) {
458 this._notImplemented('subscriptionUpsert', arguments
);
463 * Set some subscription fields
465 * @param {Object} data
466 * @param {*} data.subscriptionId
467 * @param {String} data.signatureAlgorithm
469 async
subscriptionUpdate(dbCtx
, data
) {
470 this._notImplemented('subscriptionUpdate', arguments
);
475 * Sets the isDeleted flag on a topic, and reset update time.
479 async
topicDeleted(dbCtx
, topicId
) {
480 this._notImplemented('topicDeleted', arguments
);
485 * Claim topics to fetch updates for, from available.
487 * @param {Integer} wanted maximum topic fetches to claim
488 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
489 * @param {String} claimant node id claiming these fetches
491 async
topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
492 this._notImplemented('topicFetchClaim', arguments
);
497 * Claim a topic to update.
500 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
501 * @param {String} claimant node id claiming these fetches
503 async
topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
504 this._notImplemented('topicFetchClaim', arguments
);
509 * Reset publish state, and reset deliveries for subscribers.
513 async
topicFetchComplete(dbCtx
, topicId
) {
514 this._notImplemented('topicFetchComplete', arguments
);
519 * Bump count of attempts and release claim on update.
522 * @param {Number[]} retryDelays
524 async
topicFetchIncomplete(dbCtx
, topicId
, retryDelays
) {
525 this._notImplemented('topicFetchIncomplete', arguments
);
530 * Set a topic as ready to be checked for an update.
535 async
topicFetchRequested(dbCtx
, topicId
) {
536 this._notImplemented('topicPublish', arguments
);
541 * Get all data for all topics, including subscription count.
544 async
topicGetAll(dbCtx
) {
545 this._notImplemented('topicGetAll', arguments
);
550 * Get topic data, without content.
552 * @param {String} topicUrl
553 * @param {Boolean} applyDefaults
555 async
topicGetByUrl(dbCtx
, topicUrl
, applyDefaults
= true) {
556 this._notImplemented('topicGetByUrl', arguments
);
561 * Get topic data, without content.
564 * @param {Boolean} applyDefaults
566 async
topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
567 this._notImplemented('topicGetById', arguments
);
572 * Returns topic data with content.
576 async
topicGetContentById(dbCx
, topicId
) {
577 this._notImplemented('topicGetContentById', arguments
);
582 * Attempt to delete a topic, which must be set isDeleted, if there
583 * are no more subscriptions belaying its removal.
586 async
topicPendingDelete(dbCtx
, topicId
) {
587 this._notImplemented('topicPendingDelete', arguments
);
592 * Return an array of the counts of the last #days of topic updates.
595 * @param {Number} days
596 * @returns {Number[]}
598 async
topicPublishHistory(dbCtx
, topicId
, days
) {
599 this._notImplemented('topicPublishHistory', arguments
);
604 * Create or update the basic parameters of a topic.
606 * @param {TopicData} data
608 async
topicSet(dbCtx
, data
) {
609 this._notImplemented('topicSet', arguments
);
614 * Updates a topic's content data and content update timestamp.
615 * @param {Object} data
616 * @param {*} data.topicId
617 * @param {String} data.content
618 * @param {String} data.contentHash
619 * @param {String=} data.contentType
620 * @param {String=} data.eTag
621 * @param {String=} data.lastModified
623 async
topicSetContent(dbCtx
, data
) {
624 this._notImplemented('topicSetContent', arguments
);
629 * Set some topic fields.
631 * @param {Object} data
632 * @param {*} data.topicId
633 * @param {Number=} data.leaseSecondsPreferred
634 * @param {Number=} data.leaseSecondsMin
635 * @param {Number=} data.leaseSecondsMax
636 * @param {String=} data.publisherValidationUrl
637 * @param {String=} data.contentHashAlgorithm
639 async
topicUpdate(dbCtx
, data
) {
640 this._notImplemented('topicUpdate', arguments
);
645 * Claim pending verifications for attempted resolution.
647 * @param {Integer} wanted maximum verifications to claim
648 * @param {Integer} claimTimeoutSeconds age of claimed verifications to reclaim
649 * @returns {Verification[]} array of claimed verifications
651 async
verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
652 this._notImplemented('verificationClaim', arguments
);
657 * Claim a specific verification by id, if no other similar verification claimed.
659 * @param {*} verificationId
660 * @param {Number} claimTimeoutSeconds
661 * @param {String} claimant
663 async
verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
664 this._notImplemented('verificationClaimById', arguments
);
669 * Remove the verification, any older
670 * verifications for that same client/topic, and the claim.
672 * @param {*} verificationId
673 * @param {String} callback
676 async
verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
677 this._notImplemented('verificationComplete', arguments
);
682 * Get verification data.
684 * @param {*} verificationId
686 async
verificationGetById(dbCtx
, verificationId
) {
687 this._notImplemented('verificationGetById', arguments
);
692 * Update database that a client verification was unable to complete.
693 * This releases the delivery claim and reschedules for some future time.
695 * @param {String} callback client callback url
696 * @param {*} topicId internal topic id
697 * @param {Number[]} retryDelays
699 async
verificationIncomplete(dbCtx
, verificationId
, retryDelays
) {
700 this._notImplemented('verificationIncomplete', arguments
);
705 * Create a new pending verification.
707 * @param {VerificationData} data
708 * @param {Boolean} claim
709 * @returns {*} verificationId
711 async
verificationInsert(dbCtx
, verification
) {
712 this._notImplemented('verificationInsert', arguments
);
717 * Relinquish the claim on a verification, without any other updates.
719 * @param {String} callback client callback url
720 * @param {*} topicId internal topic id
722 async
verificationRelease(dbCtx
, verificationId
) {
723 this._notImplemented('verificationRelease', arguments
);
728 * Updates some fields of an existing (presumably claimed) verification.
730 * @param {*} verificationId
731 * @param {Object} data
732 * @param {String} data.mode
733 * @param {String} data.reason
734 * @param {Boolean} data.isPublisherValidated
736 async
verificationUpdate(dbCtx
, verificationId
, data
) {
737 this._notImplemented('verificationUpdate', arguments
);
742 * Sets the isPublisherValidated flag on a verification and resets the delivery
744 * @param {*} verificationId
746 async
verificationValidated(dbCtx
, verificationId
) {
747 this._notImplemented('verificationValidated', arguments
);
752 module
.exports
= Database
;