1 /* eslint-disable no-unused-vars */
5 * This is the semi-abstract database class, providing interface and utility methods.
8 const common
= require('../common');
9 const DBErrors
= require('./errors');
10 const svh
= require('./schema-version-helper');
12 const _fileScope
= common
.fileScope(__filename
);
15 constructor(logger
, options
= {}) {
18 // Store the merged config and default values for lease values.
19 // N.B. breaking hierarchy of config options here
20 this.topicLeaseDefaults
= {};
21 common
.setOptions(this.topicLeaseDefaults
, common
.topicLeaseDefaults(), options
.topicLeaseDefaults
|| {});
26 * Turn a snake into a camel.
27 * Used when translating SQL column names to JS object style.
28 * @param {string} snakeCase snake case string
29 * @param {string | RegExp} delimiter default '_'
30 * @returns {string} camelCaseString
32 static _camelfy(snakeCase
, delimiter
= '_') {
33 if (!snakeCase
|| typeof snakeCase
.split
!== 'function') {
36 const words
= snakeCase
.split(delimiter
);
39 ...words
.map((word
) => word
.charAt(0).toUpperCase() + word
.slice(1)),
45 * Basic type checking of object properties.
46 * @param {object} object object
47 * @param {string[]} properties list of property names
48 * @param {string[]} types list of valid types for property names
50 _ensureTypes(object
, properties
, types
) {
51 const _scope
= _fileScope('_ensureTypes');
53 if (!(object
&& properties
&& types
)) {
54 this.logger
.error(_scope
, 'undefined argument', { object
, properties
, types
});
55 throw new DBErrors
.DataValidation();
57 properties
.forEach((p
) => {
58 // eslint-disable-next-line security/detect-object-injection
59 const pObj
= object
[p
];
60 const pType
= typeof pObj
;
61 if (!types
.includes(pType
)
62 && !(pObj
instanceof Buffer
&& types
.includes('buffer'))
63 && !(pObj
=== null && types
.includes('null'))
64 && !(pType
=== 'bigint' && types
.includes('number'))) {
65 const reason
= `'${p}' is '${pType}', but must be ${types.length > 1 ? 'one of ' : ''}'${types}'`;
66 this.logger
.error(_scope
, reason
, {});
67 throw new DBErrors
.DataValidation(reason
);
74 * Interface methods need implementations.
75 * @param {string} method method name
76 * @param {arguments} args arguments
78 _notImplemented(method
, args
) {
79 this.logger
.error(_fileScope(method
), 'abstract method called', Array
.from(args
));
80 throw new DBErrors
.NotImplemented(method
);
85 * Perform tasks needed to prepare database for use. Ensure this is called
86 * after construction, and before any other database activity.
87 * At the minimum, this will validate a compatible schema is present and usable.
88 * Some engines will also perform other initializations or async actions which
89 * are easier handled outside the constructor.
90 * @returns {Promise<void>}
93 const _scope
= _fileScope('initialize');
95 const currentSchema
= await
this._currentSchema();
96 const current
= svh
.schemaVersionObjectToNumber(currentSchema
);
97 const min
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.min
);
98 const max
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.max
);
99 if (current
>= min
&& current
<= max
) {
100 this.logger
.debug(_scope
, 'schema supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
102 this.logger
.error(_scope
, 'schema not supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
103 throw new DBErrors
.MigrationNeeded();
109 * Perform db connection healthcheck.
110 * @returns {Promise<void>}
112 async
healthCheck() {
113 this._notImplemented('healthCheck', arguments
);
118 * Replace any NULL from topic DB entry with default values.
119 * @param {object} topic topic entry
120 * @returns {object} updated topic entry
122 _topicDefaults(topic
) {
124 for (const [key
, value
] of Object
.entries(this.topicLeaseDefaults
)) {
125 // eslint-disable-next-line security/detect-object-injection
126 if (!(key
in topic
) || topic
[key
] === null) {
127 // eslint-disable-next-line security/detect-object-injection
137 * Ensures any lease durations in data are consistent.
138 * @param {object} data topic data
140 _leaseDurationsValidate(data
) {
141 const leaseProperties
= Object
.keys(this.topicLeaseDefaults
);
142 this._ensureTypes(data
, leaseProperties
, ['number', 'undefined', 'null']);
144 // Populate defaults on a copy of values so we can check proper numerical ordering
145 const leaseValues
= common
.pick(data
, leaseProperties
);
146 this._topicDefaults(leaseValues
);
147 for (const [prop
, value
] of Object
.entries(leaseValues
)) {
149 throw new DBErrors
.DataValidation(`${prop} must be positive`);
152 if (!(leaseValues
.leaseSecondsMin
<= leaseValues
.leaseSecondsPreferred
&& leaseValues
.leaseSecondsPreferred
<= leaseValues
.leaseSecondsMax
)) {
153 throw new DBErrors
.DataValidation('lease durations violate numerical ordering');
159 * Basic field validation for setting topic data.
160 * @param {object} data topic data
162 _topicSetDataValidate(data
) {
163 this._ensureTypes(data
, ['url'], ['string']);
164 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
165 this._leaseDurationsValidate(data
);
170 * Basic field validation for setting topic content.
171 * @param {object} data topic data
173 _topicSetContentDataValidate(data
) {
174 this._ensureTypes(data
, ['content'], ['string', 'buffer']);
175 this._ensureTypes(data
, ['contentHash'], ['string']);
176 this._ensureTypes(data
, ['contentType'], ['string', 'null', 'undefined']);
177 this._ensureTypes(data
, ['eTag'], ['string', 'null', 'undefined']);
178 this._ensureTypes(data
, ['lastModified'], ['string', 'null', 'undefined']);
183 * Basic field validation for updating topic.
184 * @param {object} data topic data
186 _topicUpdateDataValidate(data
) {
187 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
188 if (data
.publisherValidationUrl
) {
190 new URL(data
.publisherValidationUrl
);
192 throw new DBErrors
.DataValidation('invalid URL format');
195 this._ensureTypes(data
, ['contentHashAlgorithm'], ['string']);
196 if (!common
.validHash(data
.contentHashAlgorithm
)) {
197 throw new DBErrors
.DataValidation('unsupported hash algorithm');
199 this._leaseDurationsValidate(data
);
204 * Basic field validation for setting verification data.
205 * @param {object} data topic data
207 _verificationDataValidate(data
) {
208 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
209 this._ensureTypes(data
, ['callback', 'mode'], ['string']);
210 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom', 'requestId'], ['string', 'null', 'undefined']);
211 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
212 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
217 * Basic field validation for updating verification data.
218 * @param {object} data verification data
220 _verificationUpdateDataValidate(data
) {
221 this._ensureTypes(data
, ['verificationId'], ['string', 'number']);
222 this._ensureTypes(data
, ['mode'], ['string']);
223 this._ensureTypes(data
, ['reason'], ['string', 'null', 'undefined']);
224 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
229 * Basic field validation for upserting subscription data.
230 * @param {object} data subscription data
232 _subscriptionUpsertDataValidate(data
) {
233 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
234 this._ensureTypes(data
, ['callback'], ['string']);
235 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
236 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom'], ['string', 'null', 'undefined']);
241 * Basic field validation for subscription update data.
242 * @param {object} data subscription data
244 _subscriptionUpdateDataValidate(data
) {
245 this._ensureTypes(data
, ['signatureAlgorithm'], ['string', 'null', 'undefined']);
246 if (!common
.validHash(data
.signatureAlgorithm
)) {
247 throw new DBErrors
.DataValidation('unsupported hash algorithm');
252 /* Interface methods */
255 * @typedef {object} CommonDBInfo
256 * @property {number} changes result changes
257 * @property {*} lastInsertRowid result row id
258 * @property {number} duration result duration
261 * Normalize query information to a common form from a specific backend.
262 * @param {*} result db result
264 _engineInfo(result
) {
265 this._notImplemented('engineInfo', arguments
);
270 * @typedef {object} SchemaVersion
271 * @property {number} major semver major
272 * @property {number} minor semver minor
273 * @property {number} patch semver patch
276 * Query the current schema version.
277 * This is a standalone query function, as it is called before statements are loaded.
278 * @returns {SchemaVersion} schema version
280 async
_currentSchema() {
281 this._notImplemented('_currentSchema', arguments
);
286 * Wrap a function call in a database context.
287 * @param {Function} fn fn(ctx)
290 this._notImplemented('context', arguments
);
295 * Wrap a function call in a transaction context.
296 * @param {*} dbCtx db context
297 * @param {Function} fn fn(txCtx)
299 async
transaction(dbCtx
, fn
) {
300 this._notImplemented('transaction', arguments
);
305 * Store an authentication success event.
306 * @param {*} dbCtx db context
307 * @param {string} identifier authentication identifier
309 async
authenticationSuccess(dbCtx
, identifier
) {
310 this._notImplemented('authenticationSuccess', arguments
);
315 * Fetch authentication data for identifier.
316 * @param {*} dbCtx db context
317 * @param {*} identifier authentication identifier
319 async
authenticationGet(dbCtx
, identifier
) {
320 this._notImplemented('authenticationGet', arguments
);
325 * Create or update an authentication entity.
326 * @param {*} dbCtx db context
327 * @param {string} identifier authentication identifier
328 * @param {string} credential authentication credential
329 * @param {string=} otpKey authentication otp key
331 async
authenticationUpsert(dbCtx
, identifier
, credential
, otpKey
) {
332 this._notImplemented('authenticationUpsert', arguments
);
337 * Update an authentication entity's otp key.
338 * @param {*} dbCtx db context
339 * @param {string} identifier authentication identifier
340 * @param {string=} otpKey authentication otp key
342 async
authenticationUpdateOTPKey(dbCtx
, identifier
, otpKey
) {
343 this._notImplemented('authenticationUpdateKey', arguments
);
348 * Update an authentication entity's credential.
349 * @param {*} dbCtx db context
350 * @param {string} identifier authentication identifier
351 * @param {string} credential authentication credential
353 async
authenticationUpdateCredential(dbCtx
, identifier
, credential
) {
354 this._notImplemented('authenticationUpdateKey', arguments
);
359 * All subscriptions to a topic.
360 * @param {*} dbCtx db context
361 * @param {string} topicId topic id
363 async
subscriptionsByTopicId(dbCtx
, topicId
) {
364 this._notImplemented('subscriptionsByTopicId', arguments
);
369 * Number of subscriptions to a topic.
370 * @param {*} dbCtx db context
371 * @param {string} topicUrl topic url
373 async
subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
374 this._notImplemented('subscriptionCountByTopicUrl', arguments
);
379 * Remove an existing subscription.
380 * @param {*} dbCtx db context
381 * @param {string} callback subscriber callback url
382 * @param {*} topicId topic id
384 async
subscriptionDelete(dbCtx
, callback
, topicId
) {
385 this._notImplemented('subscriptionDelete', arguments
);
390 * Remove any expired subscriptions to a topic.
391 * @param {*} dbCtx db context
392 * @param {*} topicId topic id
394 async
subscriptionDeleteExpired(dbCtx
, topicId
) {
395 this._notImplemented('subscriptionDeleteExpired', arguments
);
400 * @alias {number} Integer
403 * Claim subscriptions needing content updates attempted.
404 * @param {*} dbCtx db context
405 * @param {number} wanted maximum subscription updates to claim
406 * @param {Integer} claimTimeoutSeconds age of claimed updates to reclaim
407 * @param {string} claimant worker claiming processing
408 * @returns {Array} list of subscriptions
410 async
subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
411 this._notImplemented('subscriptionDeliveryClaim', arguments
);
416 * Claim a subscription delivery.
417 * @param {*} dbCtx db context
418 * @param {*} subscriptionId subscription id
419 * @param {number} claimTimeoutSeconds duration of claim
420 * @param {*} claimant worker claiming processing
422 async
subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
423 this._notImplemented('subscriptionDeliveryClaimById', arguments
);
428 * A subscriber successfully received new topic content, update subscription.
429 * @param {*} dbCtx db context
430 * @param {string} callback subscriber callback url
431 * @param {*} topicId topic id
433 async
subscriptionDeliveryComplete(dbCtx
, callback
, topicId
) {
434 this._notImplemented('subscriptionDeliveryComplete', arguments
);
439 * A subscriber denied new topic content, remove subscription.
440 * @param {*} dbCtx db context
441 * @param {string} callback subscriber callback url
442 * @param {*} topicId topic id
444 async
subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
445 this._notImplemented('subscriptionDeliveryGone', arguments
);
450 * An attempt to deliver content to a subscriber did not complete, update delivery accordingly.
451 * @param {*} dbCtx db context
452 * @param {string} callback subscriber callback url
453 * @param {*} topicId topic id
454 * @param {number[]} retryDelays list of retry delays
456 async
subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
) {
457 this._notImplemented('subscriptionDeliveryIncomplete', arguments
);
462 * Fetch subscription details
463 * @param {*} dbCtx db context
464 * @param {string} callback subscriber callback url
465 * @param {*} topicId topic id
467 async
subscriptionGet(dbCtx
, callback
, topicId
) {
468 this._notImplemented('subscriptionGet', arguments
);
473 * Fetch subscription details
474 * @param {*} dbCtx db context
475 * @param {*} subscriptionId subscription id
477 async
subscriptionGetById(dbCtx
, subscriptionId
) {
478 this._notImplemented('subscriptionGetById', arguments
);
483 * Set subscription details
484 * @param {*} dbCtx db context
485 * @param {object} data subscription data
486 * @param {string} data.callback subscriber callback url
487 * @param {*} data.topicId topic id
488 * @param {number} data.leaseSeconds lease seconds
489 * @param {string=} data.secret secret
490 * @param {string=} data.httpRemoteAddr subscriber info
491 * @param {string=} data.httpFrom subscriber info
493 async
subscriptionUpsert(dbCtx
, data
) {
494 this._notImplemented('subscriptionUpsert', arguments
);
499 * Set some subscription fields
500 * @param {*} dbCtx db context
501 * @param {object} data subscription data
502 * @param {*} data.subscriptionId subscription id
503 * @param {string} data.signatureAlgorithm signature algorithm
505 async
subscriptionUpdate(dbCtx
, data
) {
506 this._notImplemented('subscriptionUpdate', arguments
);
511 * Sets the isDeleted flag on a topic, and reset update time.
512 * @param {*} dbCtx db context
513 * @param {*} topicId topic id
515 async
topicDeleted(dbCtx
, topicId
) {
516 this._notImplemented('topicDeleted', arguments
);
521 * Claim topics to fetch updates for, from available.
522 * @param {*} dbCtx db context
523 * @param {Integer} wanted maximum topic fetches to claim
524 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
525 * @param {string} claimant node id claiming these fetches
527 async
topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
528 this._notImplemented('topicFetchClaim', arguments
);
533 * Claim a topic to update.
534 * @param {*} dbCtx db context
535 * @param {*} topicId topic id
536 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
537 * @param {string} claimant node id claiming these fetches
539 async
topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
540 this._notImplemented('topicFetchClaim', arguments
);
545 * Reset publish state, and reset deliveries for subscribers.
546 * @param {*} dbCtx db context
547 * @param {*} topicId topic id
549 async
topicFetchComplete(dbCtx
, topicId
) {
550 this._notImplemented('topicFetchComplete', arguments
);
555 * Bump count of attempts and release claim on update.
556 * @param {*} dbCtx db context
557 * @param {*} topicId topic id
558 * @param {number[]} retryDelays retry delays
560 async
topicFetchIncomplete(dbCtx
, topicId
, retryDelays
) {
561 this._notImplemented('topicFetchIncomplete', arguments
);
566 * Set a topic as ready to be checked for an update.
567 * @param {*} dbCtx db context
568 * @param {*} topicId topic id
570 async
topicFetchRequested(dbCtx
, topicId
) {
571 this._notImplemented('topicPublish', arguments
);
576 * Get all data for all topics, including subscription count.
577 * @param {*} dbCtx db context
579 async
topicGetAll(dbCtx
) {
580 this._notImplemented('topicGetAll', arguments
);
585 * Get topic data, without content.
586 * @param {*} dbCtx db context
587 * @param {string} topicUrl topic url
588 * @param {boolean} applyDefaults merge defaults into result
590 async
topicGetByUrl(dbCtx
, topicUrl
, applyDefaults
= true) {
591 this._notImplemented('topicGetByUrl', arguments
);
596 * Get topic data, without content.
597 * @param {*} dbCtx db context
598 * @param {*} topicId topic id
599 * @param {boolean} applyDefaults merge defaults into result
601 async
topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
602 this._notImplemented('topicGetById', arguments
);
607 * Returns topic data with content.
608 * @param {*} dbCtx db context
609 * @param {*} topicId topic id
611 async
topicGetContentById(dbCtx
, topicId
) {
612 this._notImplemented('topicGetContentById', arguments
);
617 * Attempt to delete a topic, which must be set isDeleted, if there
618 * are no more subscriptions belaying its removal.
619 * @param {*} dbCtx db context
620 * @param {*} topicId topic id
622 async
topicPendingDelete(dbCtx
, topicId
) {
623 this._notImplemented('topicPendingDelete', arguments
);
628 * Return an array of the counts of the last #days of topic updates.
629 * @param {*} dbCtx db context
630 * @param {*} topicId topic id
631 * @param {number} days days back to count
632 * @returns {number[]} updates in last days
634 async
topicPublishHistory(dbCtx
, topicId
, days
) {
635 this._notImplemented('topicPublishHistory', arguments
);
640 * @alias {object} TopicData
643 * Create or update the basic parameters of a topic.
644 * @param {*} dbCtx db context
645 * @param {TopicData} data topic data
647 async
topicSet(dbCtx
, data
) {
648 this._notImplemented('topicSet', arguments
);
653 * Updates a topic's content data and content update timestamp.
654 * @param {*} dbCtx db context
655 * @param {object} data topic data
656 * @param {*} data.topicId topic id
657 * @param {string} data.content content
658 * @param {string} data.contentHash content hash
659 * @param {string=} data.contentType content-type
660 * @param {string=} data.eTag etag header
661 * @param {string=} data.lastModified last modified header
663 async
topicSetContent(dbCtx
, data
) {
664 this._notImplemented('topicSetContent', arguments
);
669 * Set some topic fields.
670 * @param {*} dbCtx db context
671 * @param {object} data topic data
672 * @param {*} data.topicId topic id
673 * @param {number=} data.leaseSecondsPreferred preferred topic lease seconds
674 * @param {number=} data.leaseSecondsMin min lease seconds
675 * @param {number=} data.leaseSecondsMax max lease seconds
676 * @param {string=} data.publisherValidationUrl publisher validation url
677 * @param {string=} data.contentHashAlgorithm content hash algorithm
679 async
topicUpdate(dbCtx
, data
) {
680 this._notImplemented('topicUpdate', arguments
);
685 * @alias {object} Verification
688 * Claim pending verifications for attempted resolution.
689 * @param {*} dbCtx db context
690 * @param {Integer} wanted maximum verifications to claim
691 * @param {Integer} claimTimeoutSeconds age of claimed verifications to reclaim
692 * @param {*} claimant worker claiming processing
693 * @returns {Verification[]} array of claimed verifications
695 async
verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
696 this._notImplemented('verificationClaim', arguments
);
701 * Claim a specific verification by id, if no other similar verification claimed.
702 * @param {*} dbCtx db context
703 * @param {*} verificationId verification id
704 * @param {number} claimTimeoutSeconds claim duration
705 * @param {string} claimant worker claiming processing
707 async
verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
708 this._notImplemented('verificationClaimById', arguments
);
713 * Remove the verification, any older verifications for that same client/topic,
714 * and remove the claim.
715 * @param {*} dbCtx db context
716 * @param {*} verificationId verification id
717 * @param {string} callback subscriber callback url
718 * @param {*} topicId topic id
720 async
verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
721 this._notImplemented('verificationComplete', arguments
);
726 * Get verification data.
727 * @param {*} dbCtx db context
728 * @param {*} verificationId verification id
730 async
verificationGetById(dbCtx
, verificationId
) {
731 this._notImplemented('verificationGetById', arguments
);
736 * Update database that a client verification was unable to complete.
737 * This releases the delivery claim and reschedules for some future time.
738 * @param {*} dbCtx db context
739 * @param {*} verificationId verification id
740 * @param {number[]} retryDelays retry delays
742 async
verificationIncomplete(dbCtx
, verificationId
, retryDelays
) {
743 this._notImplemented('verificationIncomplete', arguments
);
748 * @alias {object} VerificationData
751 * Create a new pending verification.
752 * @param {*} dbCtx db context
753 * @param {VerificationData} verification verification data
754 * @returns {*} verificationId
756 async
verificationInsert(dbCtx
, verification
) {
757 this._notImplemented('verificationInsert', arguments
);
762 * Relinquish the claim on a verification, without any other updates.
763 * @param {*} dbCtx db context
764 * @param {*} verificationId verification id
766 async
verificationRelease(dbCtx
, verificationId
) {
767 this._notImplemented('verificationRelease', arguments
);
772 * Updates some fields of an existing (presumably claimed) verification.
773 * @param {*} dbCtx db context
774 * @param {*} verificationId verification id
775 * @param {object} data verification data
776 * @param {string} data.mode mode
777 * @param {string} data.reason reason
778 * @param {boolean} data.isPublisherValidated publisher validation result
780 async
verificationUpdate(dbCtx
, verificationId
, data
) {
781 this._notImplemented('verificationUpdate', arguments
);
786 * Sets the isPublisherValidated flag on a verification and resets the delivery
787 * @param {*} dbCtx db context
788 * @param {*} verificationId verification id
790 async
verificationValidated(dbCtx
, verificationId
) {
791 this._notImplemented('verificationValidated', arguments
);
796 module
.exports
= Database
;