1 /* eslint-disable no-unused-vars */
5 * This is the semi-abstract database class, providing interface and utility methods.
8 const common
= require('../common');
9 const DBErrors
= require('./errors');
10 const svh
= require('./schema-version-helper');
12 const _fileScope
= common
.fileScope(__filename
);
15 constructor(logger
= common
.nullLogger
, options
= {}) {
17 common
.ensureLoggerLevels(this.logger
);
19 // Store the merged config and default values for lease values.
20 // N.B. breaking hierarchy of config options here
21 this.topicLeaseDefaults
= {};
22 common
.setOptions(this.topicLeaseDefaults
, common
.topicLeaseDefaults(), options
.topicLeaseDefaults
|| {});
27 * Turn a snake into a camel.
28 * Used when translating SQL column names to JS object style.
29 * @param {String} snakeCase
30 * @param {String|RegExp} delimiter
33 static _camelfy(snakeCase
, delimiter
= '_') {
34 if (!snakeCase
|| typeof snakeCase
.split
!== 'function') {
37 const words
= snakeCase
.split(delimiter
);
40 ...words
.map((word
) => word
.charAt(0).toUpperCase() + word
.slice(1)),
46 * Basic type checking of object properties.
47 * @param {Object} object
48 * @param {String[]} properties
49 * @param {String[]} types
51 _ensureTypes(object
, properties
, types
) {
52 const _scope
= _fileScope('_ensureTypes');
54 if (!(object
&& properties
&& types
)) {
55 this.logger
.error(_scope
, 'undefined argument', { object
, properties
, types
});
56 throw new DBErrors
.DataValidation();
58 properties
.forEach((p
) => {
59 // eslint-disable-next-line security/detect-object-injection
60 const pObj
= object
[p
];
61 const pType
= typeof pObj
;
62 if (!types
.includes(pType
)
63 && !(pObj
instanceof Buffer
&& types
.includes('buffer'))
64 && !(pObj
=== null && types
.includes('null'))
65 && !(pType
=== 'bigint' && types
.includes('number'))) {
66 const reason
= `'${p}' is '${pType}', but must be ${types.length > 1 ? 'one of ' : ''}'${types}'`;
67 this.logger
.error(_scope
, reason
, {});
68 throw new DBErrors
.DataValidation(reason
);
75 * Interface methods need implementations.
76 * @param {String} method
77 * @param {arguments} args
79 _notImplemented(method
, args
) {
80 this.logger
.error(_fileScope(method
), 'abstract method called', Array
.from(args
));
81 throw new DBErrors
.NotImplemented(method
);
86 * Perform tasks needed to prepare database for use. Ensure this is called
87 * after construction, and before any other database activity.
88 * At the minimum, this will validate a compatible schema is present and usable.
89 * Some engines will also perform other initializations or async actions which
90 * are easier handled outside the constructor.
93 const _scope
= _fileScope('initialize');
95 const currentSchema
= await
this._currentSchema();
96 const current
= svh
.schemaVersionObjectToNumber(currentSchema
);
97 const min
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.min
);
98 const max
= svh
.schemaVersionObjectToNumber(this.schemaVersionsSupported
.max
);
99 if (min
>= current
&& max
<= current
) {
100 this.logger
.debug(_scope
, 'schema supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
102 this.logger
.error(_scope
, 'schema not supported', { currentSchema
, schemaVersionsSupported: this.schemaVersionsSupported
});
103 throw new DBErrors
.MigrationNeeded();
109 * Perform db connection healthcheck.
111 async
healthCheck() {
112 this._notImplemented('healthCheck', arguments
);
117 * Replace any NULL from topic DB entry with default values.
118 * @param {Object} topic
121 _topicDefaults(topic
) {
123 for (const [key
, value
] of Object
.entries(this.topicLeaseDefaults
)) {
124 // eslint-disable-next-line security/detect-object-injection
125 if (!(key
in topic
) || topic
[key
] === null) {
126 // eslint-disable-next-line security/detect-object-injection
136 * Ensures any lease durations in data are consistent.
137 * @param {Object} data
139 _leaseDurationsValidate(data
) {
140 const leaseProperties
= Object
.keys(this.topicLeaseDefaults
)
141 this._ensureTypes(data
, leaseProperties
, ['number', 'undefined', 'null']);
143 // Populate defaults on a copy of values so we can check proper numerical ordering
144 const leaseValues
= common
.pick(data
, leaseProperties
);
145 this._topicDefaults(leaseValues
);
146 for (const [prop
, value
] of Object
.entries(leaseValues
)) {
148 throw new DBErrors
.DataValidation(`${prop} must be positive`);
151 if (!(leaseValues
.leaseSecondsMin
<= leaseValues
.leaseSecondsPreferred
&& leaseValues
.leaseSecondsPreferred
<= leaseValues
.leaseSecondsMax
)) {
152 throw new DBErrors
.DataValidation('lease durations violate numerical ordering');
158 * Basic field validation for setting topic data.
159 * @param {Object} data
161 _topicSetDataValidate(data
) {
162 this._ensureTypes(data
, ['url'], ['string']);
163 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
164 this._leaseDurationsValidate(data
);
169 * Basic field validation for setting topic content.
170 * @param {Object} data
172 _topicSetContentDataValidate(data
) {
173 this._ensureTypes(data
, ['content'], ['string', 'buffer']);
174 this._ensureTypes(data
, ['contentHash'], ['string']);
175 this._ensureTypes(data
, ['contentType'], ['string', 'null', 'undefined']);
180 * Basic field validation for updating topic.
181 * @param {Object} data
183 _topicUpdateDataValidate(data
) {
184 this._ensureTypes(data
, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
185 if (data
.publisherValidationUrl
) {
187 new URL(data
.publisherValidationUrl
);
189 throw new DBErrors
.DataValidation('invalid URL format');
192 this._ensureTypes(data
, ['contentHashAlgorithm'], ['string']);
193 if (!common
.validHash(data
.contentHashAlgorithm
)) {
194 throw new DBErrors
.DataValidation('unsupported hash algorithm');
196 this._leaseDurationsValidate(data
);
201 * Basic field validation for setting verification data.
202 * @param {Object} data
204 _verificationDataValidate(data
) {
205 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
206 this._ensureTypes(data
, ['callback', 'mode'], ['string']);
207 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom', 'requestId'], ['string', 'null', 'undefined']);
208 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
209 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
214 * Basic field validation for updating verification data.
215 * @param {Object} verification
217 _verificationUpdateDataValidate(data
) {
218 this._ensureTypes(data
, ['verificationId'], ['string', 'number']);
219 this._ensureTypes(data
, ['mode'], ['string']);
220 this._ensureTypes(data
, ['reason'], ['string', 'null', 'undefined']);
221 this._ensureTypes(data
, ['isPublisherValidated'], ['boolean']);
226 * Basic field validation for upserting subscription data.
227 * @param {Object} subscription
229 _subscriptionUpsertDataValidate(data
) {
230 this._ensureTypes(data
, ['topicId'], ['string', 'number']);
231 this._ensureTypes(data
, ['callback'], ['string']);
232 this._ensureTypes(data
, ['leaseSeconds'], ['number']);
233 this._ensureTypes(data
, ['secret', 'httpRemoteAddr', 'httpFrom'], ['string', 'null', 'undefined']);
237 _subscriptionUpdateDataValidate(data
) {
238 this._ensureTypes(data
, ['signatureAlgorithm'], ['string', 'null', 'undefined']);
239 if (!common
.validHash(data
.signatureAlgorithm
)) {
240 throw new DBErrors
.DataValidation('unsupported hash algorithm');
245 /* Interface methods */
248 * Normalize query information to a common form from a specific backend.
250 * @returns {Object} info
251 * @returns {Number} info.changes
252 * @returns {*} info.lastInsertRowid
253 * @returns {Number} info.duration
255 _engineInfo(result
) {
256 this._notImplemented('engineInfo', arguments
);
261 * Query the current schema version.
262 * This is a standalone query function, as it is called before statements are loaded.
263 * @returns {Object} version
264 * @returns {Number} version.major
265 * @returns {Number} version.minor
266 * @returns {Number} version.patch
268 async
_currentSchema() {
269 this._notImplemented('_currentSchema', arguments
);
274 * Wrap a function call in a database context.
275 * @param {Function} fn fn(ctx)
278 this._notImplemented('context', arguments
);
283 * Wrap a function call in a transaction context.
285 * @param {Function} fn fn(txCtx)
287 async
transaction(dbCtx
, fn
) {
288 this._notImplemented('transaction', arguments
);
293 * Store an authentication success event.
295 * @param {String} identifier
297 async
authenticationSuccess(dbCtx
, identifier
) {
298 this._notImplemented('authenticationSuccess', arguments
);
303 * Fetch authentication data for identifier.
305 * @param {*} identifier
307 async
authenticationGet(dbCtx
, identifier
) {
308 this._notImplemented('authenticationGet', arguments
);
313 * Create or update an authentication entity.
315 * @param {String} identifier
316 * @param {String} credential
318 async
authenticationUpsert(dbCtx
, identifier
, credential
) {
319 this._notImplemented('authenticationUpsert', arguments
);
324 * All subscriptions to a topic.
326 * @param {String} topicId
328 async
subscriptionsByTopicId(dbCtx
, topicId
) {
329 this._notImplemented('subscriptionsByTopicId', arguments
);
334 * Number of subscriptions to a topic.
336 * @param {String} topicUrl
338 async
subscriptionCountByTopicUrl(dbCtx
, topicUrl
) {
339 this._notImplemented('subscriptionCountByTopicUrl', arguments
);
344 * Remove an existing subscription.
346 * @param {String} callback
349 async
subscriptionDelete(dbCtx
, callback
, topicId
) {
350 this._notImplemented('subscriptionDelete', arguments
);
355 * Claim subscriptions needing content updates attempted.
357 * @param {Number} wanted maximum subscription updates to claim
358 * @param {Integer} claimTimeoutSeconds age of claimed updates to reclaim
359 * @param {String} claimant
360 * @returns {Array} list of subscriptions
362 async
subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
363 this._notImplemented('subscriptionDeliveryClaim', arguments
);
368 * Claim a subscription delivery.
370 * @param {*} subscriptionId
371 * @param {*} claimTimeoutSeconds
372 * @param {*} claimant
374 async
subscriptionDeliveryClaimById(dbCtx
, subscriptionId
, claimTimeoutSeconds
, claimant
) {
375 this._notImplemented('subscriptionDeliveryClaimById', arguments
);
380 * A subscriber successfully received new topic content, update subscription.
382 * @param {String} callback
385 async
subscriptionDeliveryComplete(dbCtx
, callback
, topicId
) {
386 this._notImplemented('subscriptionDeliveryComplete', arguments
);
391 * A subscriber denied new topic content, remove subscription.
393 * @param {String} callback
396 async
subscriptionDeliveryGone(dbCtx
, callback
, topicId
) {
397 this._notImplemented('subscriptionDeliveryGone', arguments
);
402 * An attempt to deliver content to a subscriber did not complete, update delivery accordingly.
404 * @param {String} callback
406 * @param {Number[]} retryDelays
408 async
subscriptionDeliveryIncomplete(dbCtx
, callback
, topicId
, retryDelays
) {
409 this._notImplemented('subscriptionDeliveryIncomplete', arguments
);
414 * Fetch subscription details
416 * @param {String} callback
419 async
subscriptionGet(dbCtx
, callback
, topicId
) {
420 this._notImplemented('subscriptionGet', arguments
);
425 * Fetch subscription details
427 * @param {*} subscriptionId
429 async
subscriptionGetById(dbCtx
, subscriptionId
) {
430 this._notImplemented('subscriptionGetById', arguments
);
435 * Set subscription details
437 * @param {Object} data
438 * @param {String} data.callback
439 * @param {*} data.topicId
440 * @param {Number} data.leaseSeconds
441 * @param {String=} data.secret
442 * @param {String=} data.httpRemoteAddr
443 * @param {String=} data.httpFrom
445 async
subscriptionUpsert(dbCtx
, data
) {
446 this._notImplemented('subscriptionUpsert', arguments
);
451 * Set some subscription fields
453 * @param {Object} data
454 * @param {*} data.subscriptionId
455 * @param {String} data.signatureAlgorithm
457 async
subscriptionUpdate(dbCtx
, data
) {
458 this._notImplemented('subscriptionUpdate', arguments
);
463 * Sets the isDeleted flag on a topic, and reset update time.
467 async
topicDeleted(dbCtx
, topicId
) {
468 this._notImplemented('topicDeleted', arguments
);
473 * Claim topics to fetch updates for, from available.
475 * @param {Integer} wanted maximum topic fetches to claim
476 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
477 * @param {String} claimant node id claiming these fetches
479 async
topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
480 this._notImplemented('topicFetchClaim', arguments
);
485 * Claim a topic to update.
488 * @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
489 * @param {String} claimant node id claiming these fetches
491 async
topicFetchClaimById(dbCtx
, topicId
, claimTimeoutSeconds
, claimant
) {
492 this._notImplemented('topicFetchClaim', arguments
);
497 * Reset publish state, and reset deliveries for subscribers.
501 async
topicFetchComplete(dbCtx
, topicId
) {
502 this._notImplemented('topicFetchComplete', arguments
);
507 * Bump count of attempts and release claim on update.
510 * @param {Number[]} retryDelays
512 async
topicFetchIncomplete(dbCtx
, topicId
, retryDelays
) {
513 this._notImplemented('topicFetchIncomplete', arguments
);
518 * Set a topic as ready to be checked for an update.
523 async
topicFetchRequested(dbCtx
, topicId
) {
524 this._notImplemented('topicPublish', arguments
);
529 * Get all data for all topics, including subscription count.
532 async
topicGetAll(dbCtx
) {
533 this._notImplemented('topicGetAll', arguments
);
537 * Get topic data, without content.
539 * @param {String} topicUrl
541 async
topicGetByUrl(dbCtx
, topicUrl
) {
542 this._notImplemented('topicGetByUrl', arguments
);
547 * Get topic data, without content.
550 * @param {Boolean} applyDefaults
552 async
topicGetById(dbCtx
, topicId
, applyDefaults
= true) {
553 this._notImplemented('topicGetById', arguments
);
558 * Returns topic data with content.
562 async
topicGetContentById(dbCx
, topicId
) {
563 this._notImplemented('topicGetContentById', arguments
);
567 // * Call after an unsubscribe, to check if a topic is awaiting deletion, and that
568 // * was the last subscription belaying it.
569 // * @param {String|Integer} data topic url or id
571 // async topicPendingDelete(dbCtx, data) {
572 // this._notImplemented('topicPendingDelete', arguments);
577 * Create or update the basic parameters of a topic.
579 * @param {TopicData} data
581 async
topicSet(dbCtx
, data
) {
582 this._notImplemented('topicSet', arguments
);
587 * Updates a topic's content data and content update timestamp.
588 * @param {Object} data
589 * @param {Integer} data.topicId
590 * @param {String} data.content
591 * @param {String} data.contentHash
592 * @param {String=} data.contentType
594 async
topicSetContent(dbCtx
, data
) {
595 this._notImplemented('topicSetContent', arguments
);
600 * Set some topic fields.
602 * @param {Object} data
603 * @param {*} data.topicId
604 * @param {Number=} data.leaseSecondsPreferred
605 * @param {Number=} data.leaseSecondsMin
606 * @param {Number=} data.leaseSecondsMax
607 * @param {String=} data.publisherValidationUrl
608 * @param {String=} data.contentHashAlgorithm
610 async
topicUpdate(dbCtx
, data
) {
611 this._notImplemented('topicUpdate', arguments
);
616 * Claim pending verifications for attempted resolution.
618 * @param {Integer} wanted maximum verifications to claim
619 * @param {Integer} claimTimeoutSeconds age of claimed verifications to reclaim
620 * @returns {Verification[]} array of claimed verifications
622 async
verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, claimant
) {
623 this._notImplemented('verificationClaim', arguments
);
628 * Claim a specific verification by id, if no other similar verification claimed.
630 * @param {*} verificationId
631 * @param {Number} claimTimeoutSeconds
632 * @param {String} claimant
634 async
verificationClaimById(dbCtx
, verificationId
, claimTimeoutSeconds
, claimant
) {
635 this._notImplemented('verificationClaimById', arguments
);
640 * Remove the verification, any older
641 * verifications for that same client/topic, and the claim.
643 * @param {*} verificationId
644 * @param {String} callback
647 async
verificationComplete(dbCtx
, verificationId
, callback
, topicId
) {
648 this._notImplemented('verificationComplete', arguments
);
653 * Get verification data.
655 * @param {*} verificationId
657 async
verificationGetById(dbCtx
, verificationId
) {
658 this._notImplemented('verificationGetById', arguments
);
663 * Update database that a client verification was unable to complete.
664 * This releases the delivery claim and reschedules for some future time.
666 * @param {String} callback client callback url
667 * @param {*} topicId internal topic id
668 * @param {Number[]} retryDelays
670 async
verificationIncomplete(dbCtx
, verificationId
, retryDelays
) {
671 this._notImplemented('verificationIncomplete', arguments
);
676 * Create a new pending verification.
678 * @param {VerificationData} data
679 * @param {Boolean} claim
680 * @returns {*} verificationId
682 async
verificationInsert(dbCtx
, verification
) {
683 this._notImplemented('verificationInsert', arguments
);
688 * Relinquish the claim on a verification, without any other updates.
690 * @param {String} callback client callback url
691 * @param {*} topicId internal topic id
693 async
verificationRelease(dbCtx
, verificationId
) {
694 this._notImplemented('verificationRelease', arguments
);
699 * Updates some fields of an existing (presumably claimed) verification.
701 * @param {*} verificationId
702 * @param {Object} data
703 * @param {String} data.mode
704 * @param {String} data.reason
705 * @param {Boolean} data.isPublisherValidated
707 async
verificationUpdate(dbCtx
, verificationId
, data
) {
708 this._notImplemented('verificationUpdate', arguments
);
713 * Sets the isPublisherValidated flag on a verification and resets the delivery
715 * @param {*} verificationId
717 async
verificationValidated(dbCtx
, verificationId
) {
718 this._notImplemented('verificationValidated', arguments
);
723 module
.exports
= Database
;