1 /* eslint-disable no-unused-vars */
5 * This is the semi-abstract database class, providing interface and utility methods.
8 const common
= require('../common');
9 const DBErrors
= require('./errors');
10 const svh
= require('./schema-version-helper');
12 const _fileScope
= common
.fileScope(__filename
);
15 constructor(logger
= common
.nullLogger
, options
= {}) {
17 common
.ensureLoggerLevels(this.logger
);
19 // Store the merged config and default values for lease values.
20 // N.B. breaking hierarchy of config options here
21 this.topicLeaseDefaults
= {};
22 common
.setOptions(this.topicLeaseDefaults
, common
.topicLeaseDefaults(), options
.topicLeaseDefaults
|| {});
27 * Turn a snake into a camel.
28 * Used when translating SQL column names to JS object style.
29 * @param {string} snakeCase snake case string
30 * @param {string | RegExp} delimiter default '_'
31 * @returns {string} camelCaseString
33 static _camelfy(snakeCase
, delimiter
= '_') {
34 if (!snakeCase
|| typeof snakeCase
.split
!== 'function') {
37 const words
= snakeCase
.split(delimiter
);
40 ...words
.map((word
) => word
.charAt(0).toUpperCase() + word
.slice(1)),
46 * Basic type checking of object properties.
47 * @param {object} object object
48 * @param {string[]} properties list of property names
49 * @param {string[]} types list of valid types for property names
51 _ensureTypes(object
, properties
, types
) {
52 const _scope
= _fileScope('_ensureTypes');
54 if (!(object
&& properties
&& types
)) {
55 this.logger
.error(_scope
, 'undefined argument', { object
, properties
, types
});
56 throw new DBErrors
.DataValidation();
58 properties
.forEach((p
) => {
59 // eslint-disable-next-line security/detect-object-injection
60 const pObj
= object
[p
];
61 const pType
= typeof pObj
;
62 if (!types
.includes(pType
)
63 && !(pObj
instanceof Buffer
&& types
.includes('buffer'))
64 && !(pObj
=== null && types
.includes('null'))
65 && !(pType
=== 'bigint' && types
.includes('number'))) {
66 const reason
= `'${p}' is '${pType}', but must be ${types.length > 1 ? 'one of ' : ''}'${types}'`;
67 this.logger
.error(_scope
, reason
, {});
68 throw new DBErrors
.DataValidation(reason
);
75 * Interface methods need implementations.
76 * @param {string} method method name
77 * @param {arguments} args arguments
79 _notImplemented(method
, args
) {
80 this.logger
.error(_fileScope(method
), 'abstract method called', Array
.from(args
));
81 throw new DBErrors
.NotImplemented(method
);
86 * Perform tasks needed to prepare database for use. Ensure this is called
87 * after construction, and before any other database activity.
88 * At the minimum, this will validate a compatible schema is present and usable.
89 * Some engines will also perform other initializations or async actions which
90 * are easier handled outside the constructor.
91 * @returns {Promise<void>}
94 const _scope
= _fileScope('initialize');
96 const currentSchema
= await
this._currentSchema();
97 const current
= svh
.schemaVersionObjectToNumber(currentSchema
);
98 const min
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.min
);
99 const max
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.max
);
100 if (current
>= min
&& current
<= max
) {
101 this.logger
.debug(_scope
, 'schema supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
103 this.logger
.error(_scope
, 'schema not supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
104 throw new DBErrors
.MigrationNeeded();
110 * Perform db connection healthcheck.
111 * @returns {Promise<void>}
113 async
healthCheck() {
114 this._notImplemented('healthCheck', arguments
);
119 * Replace any NULL from topic DB entry with default values.
120 * @param {object} topic topic entry
121 * @returns {object} updated topic entry
123 _topicDefaults(topic
) {
125 for (const [key
, value
] of Object
.entries(this.topicLeaseDefaults
)) {
126 // eslint-disable-next-line security/detect-object-injection
127 if (!(key
in topic
) || topic
[key
] === null) {
128 // eslint-disable-next-line security/detect-object-injection
138 * Ensures any lease durations in data are consistent.
139 * @param {object} data topic data
141 _leaseDurationsValidate(data
) {
142 const leaseProperties
= Object
.keys(this.topicLeaseDefaults
);
143 this._ensureTypes(data
, leaseProperties
, ['number', 'undefined', 'null']);
145 // Populate defaults on a copy of values so we can check proper numerical ordering
146 const leaseValues
= common
.pick(data
, leaseProperties
);
147 this._topicDefaults(leaseValues
);
148 for (const [prop
, value
] of Object
.entries(leaseValues
)) {
150 throw new DBErrors
.DataValidation(`${prop} must be positive`);
153 if (!(leaseValues
.leaseSecondsMin
<= leaseValues
.leaseSecondsPreferred
&& leaseValues
.leaseSecondsPreferred
<= leaseValues
.leaseSecondsMax
)) {
154 throw new DBErrors
.DataValidation('lease durations violate numerical ordering');
160 * Basic field validation for setting topic data.
161 * @param {object} data topic data
163 _topicSetDataValidate(data
) {
164 this._ensureTypes(data
, ['url'], ['string']);
165 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
166 this._leaseDurationsValidate(data
);
171 * Basic field validation for setting topic content.
172 * @param {object} data topic data
174 _topicSetContentDataValidate(data
) {
175 this._ensureTypes(data
, ['content'], ['string', 'buffer']);
176 this._ensureTypes(data
, ['contentHash'], ['string']);
177 this._ensureTypes(data
, ['contentType'], ['string', 'null', 'undefined']);
178 this._ensureTypes(data
, ['eTag'], ['string', 'null', 'undefined']);
179 this._ensureTypes(data
, ['lastModified'], ['string', 'null', 'undefined']);
184 * Basic field validation for updating topic.
185 * @param {object} data topic data
187 _topicUpdateDataValidate(data
) {
188 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
189 if (data
.publisherValidationUrl
) {
191 new URL(data
.publisherValidationUrl
);
193 throw new DBErrors
.DataValidation('invalid URL format');
196 this._ensureTypes(data
, ['contentHashAlgorithm'], ['string']);
197 if (!common
.validHash(data
.contentHashAlgorithm
)) {
198 throw new DBErrors
.DataValidation('unsupported hash algorithm');
200 this._leaseDurationsValidate(data
);
205 * Basic field validation for setting verification data.
206 * @param {object} data topic data
208 _verificationDataValidate(data
) {
209 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
210 this._ensureTypes(data
, ['callback', 'mode'], ['string']);
211 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom', 'requestId'], ['string', 'null', 'undefined']);
212 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
213 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
218 * Basic field validation for updating verification data.
219 * @param {object} data verification data
221 _verificationUpdateDataValidate(data
) {
222 this._ensureTypes(data
, ['verificationId'], ['string', 'number']);
223 this._ensureTypes(data
, ['mode'], ['string']);
224 this._ensureTypes(data
, ['reason'], ['string', 'null', 'undefined']);
225 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
230 * Basic field validation for upserting subscription data.
231 * @param {object} data subscription data
233 _subscriptionUpsertDataValidate(data
) {
234 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
235 this._ensureTypes(data
, ['callback'], ['string']);
236 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
237 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom'], ['string', 'null', 'undefined']);
242 * Basic field validation for subscription update data.
243 * @param {object} data subscription data
245 _subscriptionUpdateDataValidate(data
) {
246 this._ensureTypes(data
, ['signatureAlgorithm'], ['string', 'null', 'undefined']);
247 if (!common
.validHash(data
.signatureAlgorithm
)) {
248 throw new DBErrors
.DataValidation('unsupported hash algorithm');
253 /* Interface methods */
256 * @typedef {object} CommonDBInfo
257 * @property {number} changes result changes
258 * @property {*} lastInsertRowid result row id
259 * @property {number} duration result duration
262 * Normalize query information to a common form from a specific backend.
263 * @param {*} result db result
265 _engineInfo(result
) {
266 this._notImplemented('engineInfo', arguments
);
271 * @typedef {object} SchemaVersion
272 * @property {number} major semver major
273 * @property {number} minor semver minor
274 * @property {number} patch semver patch
277 * Query the current schema version.
278 * This is a standalone query function, as it is called before statements are loaded.
279 * @returns {SchemaVersion} schema version
281 async
_currentSchema() {
282 this._notImplemented('_currentSchema', arguments
);
287 * Wrap a function call in a database context.
288 * @param {Function} fn fn(ctx)
291 this._notImplemented('context', arguments
);
296 * Wrap a function call in a transaction context.
297 * @param {*} dbCtx db context
298 * @param {Function} fn fn(txCtx)
300 async
transaction(dbCtx
, fn
) {
301 this._notImplemented('transaction', arguments
);
306 * Store an authentication success event.
307 * @param {*} dbCtx db context
308 * @param {string} identifier authentication identifier
310 async
authenticationSuccess(dbCtx
, identifier
) {
311 this._notImplemented('authenticationSuccess', arguments
);
316 * Fetch authentication data for identifier.
317 * @param {*} dbCtx db context
318 * @param {*} identifier authentication identifier
320 async
authenticationGet(dbCtx
, identifier
) {
321 this._notImplemented('authenticationGet', arguments
);
326 * Create or update an authentication entity.
327 * @param {*} dbCtx db context
328 * @param {string} identifier authentication identifier
329 * @param {string} credential authentication credential
331 async
authenticationUpsert(dbCtx
, identifier
, credential
) {
332 this._notImplemented('authenticationUpsert', arguments
);
337 * All subscriptions to a topic.
338 * @param {*} dbCtx db context
339 * @param {string} topicId topic id
341 async
subscriptionsByTopicId(dbCtx
, topicId
) {
342 this._notImplemented('subscriptionsByTopicId', arguments
);
347 * Number of subscriptions to a topic.
348 * @param {*} dbCtx db context
349 * @param {string} topicUrl topic url
351 async
subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
352 this._notImplemented('subscriptionCountByTopicUrl', arguments
);
357 * Remove an existing subscription.
358 * @param {*} dbCtx db context
359 * @param {string} callback subscriber callback url
360 * @param {*} topicId topic id
362 async
subscriptionDelete(dbCtx
, callback
, topicId
) {
363 this._notImplemented('subscriptionDelete', arguments
);
368 * Remove any expired subscriptions to a topic.
369 * @param {*} dbCtx db context
370 * @param {*} topicId topic id
372 async
subscriptionDeleteExpired(dbCtx
, topicId
) {
373 this._notImplemented('subscriptionDeleteExpired', arguments
);
378 * @alias {number} Integer
381 * Claim subscriptions needing content updates attempted.
382 * @param {*} dbCtx db context
383 * @param {number} wanted maximum subscription updates to claim
384 * @param {Integer} claimTimeoutSeconds age of claimed updates to reclaim
385 * @param {string} claimant worker claiming processing
386 * @returns {Array} list of subscriptions
388 async
subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
389 this._notImplemented('subscriptionDeliveryClaim', arguments
);
394 * Claim a subscription delivery.
395 * @param {*} dbCtx db context
396 * @param {*} subscriptionId subscription id
397 * @param {number} claimTimeoutSeconds duration of claim
398 * @param {*} claimant worker claiming processing
400 async
subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
401 this._notImplemented('subscriptionDeliveryClaimById', arguments
);
406 * A subscriber successfully received new topic content, update subscription.
407 * @param {*} dbCtx db context
408 * @param {string} callback subscriber callback url
409 * @param {*} topicId topic id
411 async
subscriptionDeliveryComplete(dbCtx
, callback
, topicId
) {
412 this._notImplemented('subscriptionDeliveryComplete', arguments
);
417 * A subscriber denied new topic content, remove subscription.
418 * @param {*} dbCtx db context
419 * @param {string} callback subscriber callback url
420 * @param {*} topicId topic id
422 async
subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
423 this._notImplemented('subscriptionDeliveryGone', arguments
);
428 * An attempt to deliver content to a subscriber did not complete, update delivery accordingly.
429 * @param {*} dbCtx db context
430 * @param {string} callback subscriber callback url
431 * @param {*} topicId topic id
432 * @param {number[]} retryDelays list of retry delays
434 async
subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
) {
435 this._notImplemented('subscriptionDeliveryIncomplete', arguments
);
440 * Fetch subscription details
441 * @param {*} dbCtx db context
442 * @param {string} callback subscriber callback url
443 * @param {*} topicId topic id
445 async
subscriptionGet(dbCtx
, callback
, topicId
) {
446 this._notImplemented('subscriptionGet', arguments
);
451 * Fetch subscription details
452 * @param {*} dbCtx db context
453 * @param {*} subscriptionId subscription id
455 async
subscriptionGetById(dbCtx
, subscriptionId
) {
456 this._notImplemented('subscriptionGetById', arguments
);
461 * Set subscription details
462 * @param {*} dbCtx db context
463 * @param {object} data subscription data
464 * @param {string} data.callback subscriber callback url
465 * @param {*} data.topicId topic id
466 * @param {number} data.leaseSeconds lease seconds
467 * @param {string=} data.secret secret
468 * @param {string=} data.httpRemoteAddr subscriber info
469 * @param {string=} data.httpFrom subscriber info
471 async
subscriptionUpsert(dbCtx
, data
) {
472 this._notImplemented('subscriptionUpsert', arguments
);
477 * Set some subscription fields
478 * @param {*} dbCtx db context
479 * @param {object} data subscription data
480 * @param {*} data.subscriptionId subscription id
481 * @param {string} data.signatureAlgorithm signature algorithm
483 async
subscriptionUpdate(dbCtx
, data
) {
484 this._notImplemented('subscriptionUpdate', arguments
);
489 * Sets the isDeleted flag on a topic, and reset update time.
490 * @param {*} dbCtx db context
491 * @param {*} topicId topic id
493 async
topicDeleted(dbCtx
, topicId
) {
494 this._notImplemented('topicDeleted', arguments
);
499 * Claim topics to fetch updates for, from available.
500 * @param {*} dbCtx db context
501 * @param {Integer} wanted maximum topic fetches to claim
502 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
503 * @param {string} claimant node id claiming these fetches
505 async
topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
506 this._notImplemented('topicFetchClaim', arguments
);
511 * Claim a topic to update.
512 * @param {*} dbCtx db context
513 * @param {*} topicId topic id
514 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
515 * @param {string} claimant node id claiming these fetches
517 async
topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
518 this._notImplemented('topicFetchClaim', arguments
);
523 * Reset publish state, and reset deliveries for subscribers.
524 * @param {*} dbCtx db context
525 * @param {*} topicId topic id
527 async
topicFetchComplete(dbCtx
, topicId
) {
528 this._notImplemented('topicFetchComplete', arguments
);
533 * Bump count of attempts and release claim on update.
534 * @param {*} dbCtx db context
535 * @param {*} topicId topic id
536 * @param {number[]} retryDelays retry delays
538 async
topicFetchIncomplete(dbCtx
, topicId
, retryDelays
) {
539 this._notImplemented('topicFetchIncomplete', arguments
);
544 * Set a topic as ready to be checked for an update.
545 * @param {*} dbCtx db context
546 * @param {*} topicId topic id
548 async
topicFetchRequested(dbCtx
, topicId
) {
549 this._notImplemented('topicPublish', arguments
);
554 * Get all data for all topics, including subscription count.
555 * @param {*} dbCtx db context
557 async
topicGetAll(dbCtx
) {
558 this._notImplemented('topicGetAll', arguments
);
563 * Get topic data, without content.
564 * @param {*} dbCtx db context
565 * @param {string} topicUrl topic url
566 * @param {boolean} applyDefaults merge defaults into result
568 async
topicGetByUrl(dbCtx
, topicUrl
, applyDefaults
= true) {
569 this._notImplemented('topicGetByUrl', arguments
);
574 * Get topic data, without content.
575 * @param {*} dbCtx db context
576 * @param {*} topicId topic id
577 * @param {boolean} applyDefaults merge defaults into result
579 async
topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
580 this._notImplemented('topicGetById', arguments
);
585 * Returns topic data with content.
586 * @param {*} dbCtx db context
587 * @param {*} topicId topic id
589 async
topicGetContentById(dbCtx
, topicId
) {
590 this._notImplemented('topicGetContentById', arguments
);
595 * Attempt to delete a topic, which must be set isDeleted, if there
596 * are no more subscriptions belaying its removal.
597 * @param {*} dbCtx db context
598 * @param {*} topicId topic id
600 async
topicPendingDelete(dbCtx
, topicId
) {
601 this._notImplemented('topicPendingDelete', arguments
);
606 * Return an array of the counts of the last #days of topic updates.
607 * @param {*} dbCtx db context
608 * @param {*} topicId topic id
609 * @param {number} days days back to count
610 * @returns {number[]} updates in last days
612 async
topicPublishHistory(dbCtx
, topicId
, days
) {
613 this._notImplemented('topicPublishHistory', arguments
);
618 * @alias {object} TopicData
621 * Create or update the basic parameters of a topic.
622 * @param {*} dbCtx db context
623 * @param {TopicData} data topic data
625 async
topicSet(dbCtx
, data
) {
626 this._notImplemented('topicSet', arguments
);
631 * Updates a topic's content data and content update timestamp.
632 * @param {*} dbCtx db context
633 * @param {object} data topic data
634 * @param {*} data.topicId topic id
635 * @param {string} data.content content
636 * @param {string} data.contentHash content hash
637 * @param {string=} data.contentType content-type
638 * @param {string=} data.eTag etag header
639 * @param {string=} data.lastModified last modified header
641 async
topicSetContent(dbCtx
, data
) {
642 this._notImplemented('topicSetContent', arguments
);
647 * Set some topic fields.
648 * @param {*} dbCtx db context
649 * @param {object} data topic data
650 * @param {*} data.topicId topic id
651 * @param {number=} data.leaseSecondsPreferred preferred topic lease seconds
652 * @param {number=} data.leaseSecondsMin min lease seconds
653 * @param {number=} data.leaseSecondsMax max lease seconds
654 * @param {string=} data.publisherValidationUrl publisher validation url
655 * @param {string=} data.contentHashAlgorithm content hash algorithm
657 async
topicUpdate(dbCtx
, data
) {
658 this._notImplemented('topicUpdate', arguments
);
663 * @alias {object} Verification
666 * Claim pending verifications for attempted resolution.
667 * @param {*} dbCtx db context
668 * @param {Integer} wanted maximum verifications to claim
669 * @param {Integer} claimTimeoutSeconds age of claimed verifications to reclaim
670 * @param {*} claimant worker claiming processing
671 * @returns {Verification[]} array of claimed verifications
673 async
verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
674 this._notImplemented('verificationClaim', arguments
);
679 * Claim a specific verification by id, if no other similar verification claimed.
680 * @param {*} dbCtx db context
681 * @param {*} verificationId verification id
682 * @param {number} claimTimeoutSeconds claim duration
683 * @param {string} claimant worker claiming processing
685 async
verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
686 this._notImplemented('verificationClaimById', arguments
);
691 * Remove the verification, any older verifications for that same client/topic,
692 * and remove the claim.
693 * @param {*} dbCtx db context
694 * @param {*} verificationId verification id
695 * @param {string} callback subscriber callback url
696 * @param {*} topicId topic id
698 async
verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
699 this._notImplemented('verificationComplete', arguments
);
704 * Get verification data.
705 * @param {*} dbCtx db context
706 * @param {*} verificationId verification id
708 async
verificationGetById(dbCtx
, verificationId
) {
709 this._notImplemented('verificationGetById', arguments
);
714 * Update database that a client verification was unable to complete.
715 * This releases the delivery claim and reschedules for some future time.
716 * @param {*} dbCtx db context
717 * @param {*} verificationId verification id
718 * @param {number[]} retryDelays retry delays
720 async
verificationIncomplete(dbCtx
, verificationId
, retryDelays
) {
721 this._notImplemented('verificationIncomplete', arguments
);
726 * @alias {object} VerificationData
729 * Create a new pending verification.
730 * @param {*} dbCtx db context
731 * @param {VerificationData} verification verification data
732 * @returns {*} verificationId
734 async
verificationInsert(dbCtx
, verification
) {
735 this._notImplemented('verificationInsert', arguments
);
740 * Relinquish the claim on a verification, without any other updates.
741 * @param {*} dbCtx db context
742 * @param {*} verificationId verification id
744 async
verificationRelease(dbCtx
, verificationId
) {
745 this._notImplemented('verificationRelease', arguments
);
750 * Updates some fields of an existing (presumably claimed) verification.
751 * @param {*} dbCtx db context
752 * @param {*} verificationId verification id
753 * @param {object} data verification data
754 * @param {string} data.mode mode
755 * @param {string} data.reason reason
756 * @param {boolean} data.isPublisherValidated publisher validation result
758 async
verificationUpdate(dbCtx
, verificationId
, data
) {
759 this._notImplemented('verificationUpdate', arguments
);
764 * Sets the isPublisherValidated flag on a verification and resets the delivery
765 * @param {*} dbCtx db context
766 * @param {*} verificationId verification id
768 async
verificationValidated(dbCtx
, verificationId
) {
769 this._notImplemented('verificationValidated', arguments
);
774 module
.exports
= Database
;