1 /* eslint-disable no-unused-vars */
5 * This is the semi-abstract database class, providing interface and utility methods.
8 const common
= require('../common');
9 const DBErrors
= require('./errors');
10 const svh
= require('./schema-version-helper');
12 const _fileScope
= common
.fileScope(__filename
);
15 constructor(logger
= common
.nullLogger
, options
= {}) {
17 common
.ensureLoggerLevels(this.logger
);
19 // Store the merged config and default values for lease values.
20 // N.B. breaking hierarchy of config options here
21 this.topicLeaseDefaults
= {};
22 common
.setOptions(this.topicLeaseDefaults
, common
.topicLeaseDefaults(), options
.topicLeaseDefaults
|| {});
27 * Turn a snake into a camel.
28 * Used when translating SQL column names to JS object style.
29 * @param {String} snakeCase
30 * @param {String|RegExp} delimiter
33 static _camelfy(snakeCase
, delimiter
= '_') {
34 if (!snakeCase
|| typeof snakeCase
.split
!== 'function') {
37 const words
= snakeCase
.split(delimiter
);
40 ...words
.map((word
) => word
.charAt(0).toUpperCase() + word
.slice(1)),
46 * Basic type checking of object properties.
47 * @param {Object} object
48 * @param {String[]} properties
49 * @param {String[]} types
51 _ensureTypes(object
, properties
, types
) {
52 const _scope
= _fileScope('_ensureTypes');
54 if (!(object
&& properties
&& types
)) {
55 this.logger
.error(_scope
, 'undefined argument', { object
, properties
, types
});
56 throw new DBErrors
.DataValidation();
58 properties
.forEach((p
) => {
59 // eslint-disable-next-line security/detect-object-injection
60 const pObj
= object
[p
];
61 const pType
= typeof pObj
;
62 if (!types
.includes(pType
)
63 && !(pObj
instanceof Buffer
&& types
.includes('buffer'))
64 && !(pObj
=== null && types
.includes('null'))
65 && !(pType
=== 'bigint' && types
.includes('number'))) {
66 const reason
= `'${p}' is '${pType}', but must be ${types.length > 1 ? 'one of ' : ''}'${types}'`;
67 this.logger
.error(_scope
, reason
, {});
68 throw new DBErrors
.DataValidation(reason
);
75 * Interface methods need implementations.
76 * @param {String} method
77 * @param {arguments} args
79 _notImplemented(method
, args
) {
80 this.logger
.error(_fileScope(method
), 'abstract method called', Array
.from(args
));
81 throw new DBErrors
.NotImplemented(method
);
86 * Perform tasks needed to prepare database for use. Ensure this is called
87 * after construction, and before any other database activity.
88 * At the minimum, this will validate a compatible schema is present and usable.
89 * Some engines will also perform other initializations or async actions which
90 * are easier handled outside the constructor.
93 const _scope
= _fileScope('initialize');
95 const currentSchema
= await
this._currentSchema();
96 const current
= svh
.schemaVersionObjectToNumber(currentSchema
);
97 const min
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.min
);
98 const max
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.max
);
99 if (current
>= min
&& current
<= max
) {
100 this.logger
.debug(_scope
, 'schema supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
102 this.logger
.error(_scope
, 'schema not supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
103 throw new DBErrors
.MigrationNeeded();
109 * Perform db connection healthcheck.
111 async
healthCheck() {
112 this._notImplemented('healthCheck', arguments
);
117 * Replace any NULL from topic DB entry with default values.
118 * @param {Object} topic
121 _topicDefaults(topic
) {
123 for (const [key
, value
] of Object
.entries(this.topicLeaseDefaults
)) {
124 // eslint-disable-next-line security/detect-object-injection
125 if (!(key
in topic
) || topic
[key
] === null) {
126 // eslint-disable-next-line security/detect-object-injection
136 * Ensures any lease durations in data are consistent.
137 * @param {Object} data
139 _leaseDurationsValidate(data
) {
140 const leaseProperties
= Object
.keys(this.topicLeaseDefaults
)
141 this._ensureTypes(data
, leaseProperties
, ['number', 'undefined', 'null']);
143 // Populate defaults on a copy of values so we can check proper numerical ordering
144 const leaseValues
= common
.pick(data
, leaseProperties
);
145 this._topicDefaults(leaseValues
);
146 for (const [prop
, value
] of Object
.entries(leaseValues
)) {
148 throw new DBErrors
.DataValidation(`${prop} must be positive`);
151 if (!(leaseValues
.leaseSecondsMin
<= leaseValues
.leaseSecondsPreferred
&& leaseValues
.leaseSecondsPreferred
<= leaseValues
.leaseSecondsMax
)) {
152 throw new DBErrors
.DataValidation('lease durations violate numerical ordering');
158 * Basic field validation for setting topic data.
159 * @param {Object} data
161 _topicSetDataValidate(data
) {
162 this._ensureTypes(data
, ['url'], ['string']);
163 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
164 this._leaseDurationsValidate(data
);
169 * Basic field validation for setting topic content.
170 * @param {Object} data
172 _topicSetContentDataValidate(data
) {
173 this._ensureTypes(data
, ['content'], ['string', 'buffer']);
174 this._ensureTypes(data
, ['contentHash'], ['string']);
175 this._ensureTypes(data
, ['contentType'], ['string', 'null', 'undefined']);
180 * Basic field validation for updating topic.
181 * @param {Object} data
183 _topicUpdateDataValidate(data
) {
184 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
185 if (data
.publisherValidationUrl
) {
187 new URL(data
.publisherValidationUrl
);
189 throw new DBErrors
.DataValidation('invalid URL format');
192 this._ensureTypes(data
, ['contentHashAlgorithm'], ['string']);
193 if (!common
.validHash(data
.contentHashAlgorithm
)) {
194 throw new DBErrors
.DataValidation('unsupported hash algorithm');
196 this._leaseDurationsValidate(data
);
201 * Basic field validation for setting verification data.
202 * @param {Object} data
204 _verificationDataValidate(data
) {
205 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
206 this._ensureTypes(data
, ['callback', 'mode'], ['string']);
207 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom', 'requestId'], ['string', 'null', 'undefined']);
208 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
209 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
214 * Basic field validation for updating verification data.
215 * @param {Object} verification
217 _verificationUpdateDataValidate(data
) {
218 this._ensureTypes(data
, ['verificationId'], ['string', 'number']);
219 this._ensureTypes(data
, ['mode'], ['string']);
220 this._ensureTypes(data
, ['reason'], ['string', 'null', 'undefined']);
221 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
226 * Basic field validation for upserting subscription data.
227 * @param {Object} subscription
229 _subscriptionUpsertDataValidate(data
) {
230 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
231 this._ensureTypes(data
, ['callback'], ['string']);
232 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
233 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom'], ['string', 'null', 'undefined']);
237 _subscriptionUpdateDataValidate(data
) {
238 this._ensureTypes(data
, ['signatureAlgorithm'], ['string', 'null', 'undefined']);
239 if (!common
.validHash(data
.signatureAlgorithm
)) {
240 throw new DBErrors
.DataValidation('unsupported hash algorithm');
245 /* Interface methods */
248 * Normalize query information to a common form from a specific backend.
250 * @returns {Object} info
251 * @returns {Number} info.changes
252 * @returns {*} info.lastInsertRowid
253 * @returns {Number} info.duration
255 _engineInfo(result
) {
256 this._notImplemented('engineInfo', arguments
);
261 * Query the current schema version.
262 * This is a standalone query function, as it is called before statements are loaded.
263 * @returns {Object} version
264 * @returns {Number} version.major
265 * @returns {Number} version.minor
266 * @returns {Number} version.patch
268 async
_currentSchema() {
269 this._notImplemented('_currentSchema', arguments
);
274 * Wrap a function call in a database context.
275 * @param {Function} fn fn(ctx)
278 this._notImplemented('context', arguments
);
283 * Wrap a function call in a transaction context.
285 * @param {Function} fn fn(txCtx)
287 async
transaction(dbCtx
, fn
) {
288 this._notImplemented('transaction', arguments
);
293 * Store an authentication success event.
295 * @param {String} identifier
297 async
authenticationSuccess(dbCtx
, identifier
) {
298 this._notImplemented('authenticationSuccess', arguments
);
303 * Fetch authentication data for identifier.
305 * @param {*} identifier
307 async
authenticationGet(dbCtx
, identifier
) {
308 this._notImplemented('authenticationGet', arguments
);
313 * Create or update an authentication entity.
315 * @param {String} identifier
316 * @param {String} credential
318 async
authenticationUpsert(dbCtx
, identifier
, credential
) {
319 this._notImplemented('authenticationUpsert', arguments
);
324 * All subscriptions to a topic.
326 * @param {String} topicId
328 async
subscriptionsByTopicId(dbCtx
, topicId
) {
329 this._notImplemented('subscriptionsByTopicId', arguments
);
334 * Number of subscriptions to a topic.
336 * @param {String} topicUrl
338 async
subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
339 this._notImplemented('subscriptionCountByTopicUrl', arguments
);
344 * Remove an existing subscription.
346 * @param {String} callback
349 async
subscriptionDelete(dbCtx
, callback
, topicId
) {
350 this._notImplemented('subscriptionDelete', arguments
);
355 * Remove any expired subscriptions to a topic.
359 async
subscriptionDeleteExpired(dbCtx
, topicId
) {
360 this._notImplemented('subscriptionDeleteExpired', arguments
);
365 * Claim subscriptions needing content updates attempted.
367 * @param {Number} wanted maximum subscription updates to claim
368 * @param {Integer} claimTimeoutSeconds age of claimed updates to reclaim
369 * @param {String} claimant
370 * @returns {Array} list of subscriptions
372 async
subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
373 this._notImplemented('subscriptionDeliveryClaim', arguments
);
378 * Claim a subscription delivery.
380 * @param {*} subscriptionId
381 * @param {*} claimTimeoutSeconds
382 * @param {*} claimant
384 async
subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
385 this._notImplemented('subscriptionDeliveryClaimById', arguments
);
390 * A subscriber successfully received new topic content, update subscription.
392 * @param {String} callback
395 async
subscriptionDeliveryComplete(dbCtx
, callback
, topicId
) {
396 this._notImplemented('subscriptionDeliveryComplete', arguments
);
401 * A subscriber denied new topic content, remove subscription.
403 * @param {String} callback
406 async
subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
407 this._notImplemented('subscriptionDeliveryGone', arguments
);
412 * An attempt to deliver content to a subscriber did not complete, update delivery accordingly.
414 * @param {String} callback
416 * @param {Number[]} retryDelays
418 async
subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
) {
419 this._notImplemented('subscriptionDeliveryIncomplete', arguments
);
424 * Fetch subscription details
426 * @param {String} callback
429 async
subscriptionGet(dbCtx
, callback
, topicId
) {
430 this._notImplemented('subscriptionGet', arguments
);
435 * Fetch subscription details
437 * @param {*} subscriptionId
439 async
subscriptionGetById(dbCtx
, subscriptionId
) {
440 this._notImplemented('subscriptionGetById', arguments
);
445 * Set subscription details
447 * @param {Object} data
448 * @param {String} data.callback
449 * @param {*} data.topicId
450 * @param {Number} data.leaseSeconds
451 * @param {String=} data.secret
452 * @param {String=} data.httpRemoteAddr
453 * @param {String=} data.httpFrom
455 async
subscriptionUpsert(dbCtx
, data
) {
456 this._notImplemented('subscriptionUpsert', arguments
);
461 * Set some subscription fields
463 * @param {Object} data
464 * @param {*} data.subscriptionId
465 * @param {String} data.signatureAlgorithm
467 async
subscriptionUpdate(dbCtx
, data
) {
468 this._notImplemented('subscriptionUpdate', arguments
);
473 * Sets the isDeleted flag on a topic, and reset update time.
477 async
topicDeleted(dbCtx
, topicId
) {
478 this._notImplemented('topicDeleted', arguments
);
483 * Claim topics to fetch updates for, from available.
485 * @param {Integer} wanted maximum topic fetches to claim
486 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
487 * @param {String} claimant node id claiming these fetches
489 async
topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
490 this._notImplemented('topicFetchClaim', arguments
);
495 * Claim a topic to update.
498 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
499 * @param {String} claimant node id claiming these fetches
501 async
topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
502 this._notImplemented('topicFetchClaim', arguments
);
507 * Reset publish state, and reset deliveries for subscribers.
511 async
topicFetchComplete(dbCtx
, topicId
) {
512 this._notImplemented('topicFetchComplete', arguments
);
517 * Bump count of attempts and release claim on update.
520 * @param {Number[]} retryDelays
522 async
topicFetchIncomplete(dbCtx
, topicId
, retryDelays
) {
523 this._notImplemented('topicFetchIncomplete', arguments
);
528 * Set a topic as ready to be checked for an update.
533 async
topicFetchRequested(dbCtx
, topicId
) {
534 this._notImplemented('topicPublish', arguments
);
539 * Get all data for all topics, including subscription count.
542 async
topicGetAll(dbCtx
) {
543 this._notImplemented('topicGetAll', arguments
);
548 * Get topic data, without content.
550 * @param {String} topicUrl
552 async
topicGetByUrl(dbCtx
, topicUrl
) {
553 this._notImplemented('topicGetByUrl', arguments
);
558 * Get topic data, without content.
561 * @param {Boolean} applyDefaults
563 async
topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
564 this._notImplemented('topicGetById', arguments
);
569 * Returns topic data with content.
573 async
topicGetContentById(dbCx
, topicId
) {
574 this._notImplemented('topicGetContentById', arguments
);
579 * Attempt to delete a topic, which must be set isDeleted, if there
580 * are no more subscriptions belaying its removal.
583 async
topicPendingDelete(dbCtx
, topicId
) {
584 this._notImplemented('topicPendingDelete', arguments
);
589 * Create or update the basic parameters of a topic.
591 * @param {TopicData} data
593 async
topicSet(dbCtx
, data
) {
594 this._notImplemented('topicSet', arguments
);
599 * Updates a topic's content data and content update timestamp.
600 * @param {Object} data
601 * @param {Integer} data.topicId
602 * @param {String} data.content
603 * @param {String} data.contentHash
604 * @param {String=} data.contentType
606 async
topicSetContent(dbCtx
, data
) {
607 this._notImplemented('topicSetContent', arguments
);
612 * Set some topic fields.
614 * @param {Object} data
615 * @param {*} data.topicId
616 * @param {Number=} data.leaseSecondsPreferred
617 * @param {Number=} data.leaseSecondsMin
618 * @param {Number=} data.leaseSecondsMax
619 * @param {String=} data.publisherValidationUrl
620 * @param {String=} data.contentHashAlgorithm
622 async
topicUpdate(dbCtx
, data
) {
623 this._notImplemented('topicUpdate', arguments
);
628 * Claim pending verifications for attempted resolution.
630 * @param {Integer} wanted maximum verifications to claim
631 * @param {Integer} claimTimeoutSeconds age of claimed verifications to reclaim
632 * @returns {Verification[]} array of claimed verifications
634 async
verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
635 this._notImplemented('verificationClaim', arguments
);
640 * Claim a specific verification by id, if no other similar verification claimed.
642 * @param {*} verificationId
643 * @param {Number} claimTimeoutSeconds
644 * @param {String} claimant
646 async
verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
647 this._notImplemented('verificationClaimById', arguments
);
652 * Remove the verification, any older
653 * verifications for that same client/topic, and the claim.
655 * @param {*} verificationId
656 * @param {String} callback
659 async
verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
660 this._notImplemented('verificationComplete', arguments
);
665 * Get verification data.
667 * @param {*} verificationId
669 async
verificationGetById(dbCtx
, verificationId
) {
670 this._notImplemented('verificationGetById', arguments
);
675 * Update database that a client verification was unable to complete.
676 * This releases the delivery claim and reschedules for some future time.
678 * @param {String} callback client callback url
679 * @param {*} topicId internal topic id
680 * @param {Number[]} retryDelays
682 async
verificationIncomplete(dbCtx
, verificationId
, retryDelays
) {
683 this._notImplemented('verificationIncomplete', arguments
);
688 * Create a new pending verification.
690 * @param {VerificationData} data
691 * @param {Boolean} claim
692 * @returns {*} verificationId
694 async
verificationInsert(dbCtx
, verification
) {
695 this._notImplemented('verificationInsert', arguments
);
700 * Relinquish the claim on a verification, without any other updates.
702 * @param {String} callback client callback url
703 * @param {*} topicId internal topic id
705 async
verificationRelease(dbCtx
, verificationId
) {
706 this._notImplemented('verificationRelease', arguments
);
711 * Updates some fields of an existing (presumably claimed) verification.
713 * @param {*} verificationId
714 * @param {Object} data
715 * @param {String} data.mode
716 * @param {String} data.reason
717 * @param {Boolean} data.isPublisherValidated
719 async
verificationUpdate(dbCtx
, verificationId
, data
) {
720 this._notImplemented('verificationUpdate', arguments
);
725 * Sets the isPublisherValidated flag on a verification and resets the delivery
727 * @param {*} verificationId
729 async
verificationValidated(dbCtx
, verificationId
) {
730 this._notImplemented('verificationValidated', arguments
);
735 module
.exports
= Database
;