const _fileScope = common.fileScope(__filename);
class Database {
- constructor(logger = common.nullLogger, options = {}) {
+ constructor(logger, options = {}) {
this.logger = logger;
- common.ensureLoggerLevels(this.logger);
// Store the merged config and default values for lease values.
// N.B. breaking hierarchy of config options here
/**
* Turn a snake into a camel.
* Used when translating SQL column names to JS object style.
- * @param {String} snakeCase
- * @param {String|RegExp} delimiter
- * @returns {String}
+ * @param {string} snakeCase snake case string
+ * @param {string | RegExp} delimiter default '_'
+ * @returns {string} camelCaseString
*/
static _camelfy(snakeCase, delimiter = '_') {
if (!snakeCase || typeof snakeCase.split !== 'function') {
/**
* Basic type checking of object properties.
- * @param {Object} object
- * @param {String[]} properties
- * @param {String[]} types
+ * @param {object} object object
+ * @param {string[]} properties list of property names
+ * @param {string[]} types list of valid types for property names
*/
_ensureTypes(object, properties, types) {
const _scope = _fileScope('_ensureTypes');
/**
* Interface methods need implementations.
- * @param {String} method
- * @param {arguments} args
+ * @param {string} method method name
+ * @param {arguments} args arguments
*/
_notImplemented(method, args) {
this.logger.error(_fileScope(method), 'abstract method called', Array.from(args));
* At the minimum, this will validate a compatible schema is present and usable.
* Some engines will also perform other initializations or async actions which
* are easier handled outside the constructor.
- */
+ * @returns {Promise<void>}
+ */
async initialize() {
const _scope = _fileScope('initialize');
/**
* Perform db connection healthcheck.
+ * @returns {Promise<void>}
*/
async healthCheck() {
this._notImplemented('healthCheck', arguments);
/**
* Replace any NULL from topic DB entry with default values.
- * @param {Object} topic
- * @returns {Object}
+ * @param {object} topic topic entry
+ * @returns {object} updated topic entry
*/
_topicDefaults(topic) {
if (topic) {
/**
* Ensures any lease durations in data are consistent.
- * @param {Object} data
+ * @param {object} data topic data
*/
_leaseDurationsValidate(data) {
- const leaseProperties = Object.keys(this.topicLeaseDefaults)
+ const leaseProperties = Object.keys(this.topicLeaseDefaults);
this._ensureTypes(data, leaseProperties, ['number', 'undefined', 'null']);
// Populate defaults on a copy of values so we can check proper numerical ordering
/**
* Basic field validation for setting topic data.
- * @param {Object} data
+ * @param {object} data topic data
*/
_topicSetDataValidate(data) {
this._ensureTypes(data, ['url'], ['string']);
/**
* Basic field validation for setting topic content.
- * @param {Object} data
+ * @param {object} data topic data
*/
_topicSetContentDataValidate(data) {
this._ensureTypes(data, ['content'], ['string', 'buffer']);
this._ensureTypes(data, ['contentHash'], ['string']);
this._ensureTypes(data, ['contentType'], ['string', 'null', 'undefined']);
+ this._ensureTypes(data, ['eTag'], ['string', 'null', 'undefined']);
+ this._ensureTypes(data, ['lastModified'], ['string', 'null', 'undefined']);
}
/**
* Basic field validation for updating topic.
- * @param {Object} data
+ * @param {object} data topic data
*/
_topicUpdateDataValidate(data) {
this._ensureTypes(data, ['publisherValidationUrl'], ['string', 'undefined', 'null']);
/**
* Basic field validation for setting verification data.
- * @param {Object} data
+ * @param {object} data topic data
*/
_verificationDataValidate(data) {
this._ensureTypes(data, ['topicId'], ['string', 'number']);
/**
* Basic field validation for updating verification data.
- * @param {Object} verification
+ * @param {object} data verification data
*/
_verificationUpdateDataValidate(data) {
this._ensureTypes(data, ['verificationId'], ['string', 'number']);
/**
* Basic field validation for upserting subscription data.
- * @param {Object} subscription
+ * @param {object} data subscription data
*/
_subscriptionUpsertDataValidate(data) {
this._ensureTypes(data, ['topicId'], ['string', 'number']);
}
+ /**
+ * Basic field validation for subscription update data.
+ * @param {object} data subscription data
+ */
_subscriptionUpdateDataValidate(data) {
this._ensureTypes(data, ['signatureAlgorithm'], ['string', 'null', 'undefined']);
if (!common.validHash(data.signatureAlgorithm)) {
/* Interface methods */
+ /**
+ * @typedef {object} CommonDBInfo
+ * @property {number} changes result changes
+ * @property {*} lastInsertRowid result row id
+ * @property {number} duration result duration
+ */
/**
* Normalize query information to a common form from a specific backend.
- * @param {*} result
- * @returns {Object} info
- * @returns {Number} info.changes
- * @returns {*} info.lastInsertRowid
- * @returns {Number} info.duration
+ * @param {*} result db result
*/
_engineInfo(result) {
this._notImplemented('engineInfo', arguments);
}
+ /**
+ * @typedef {object} SchemaVersion
+ * @property {number} major semver major
+ * @property {number} minor semver minor
+ * @property {number} patch semver patch
+ */
/**
* Query the current schema version.
* This is a standalone query function, as it is called before statements are loaded.
- * @returns {Object} version
- * @returns {Number} version.major
- * @returns {Number} version.minor
- * @returns {Number} version.patch
+ * @returns {SchemaVersion} schema version
*/
async _currentSchema() {
this._notImplemented('_currentSchema', arguments);
/**
* Wrap a function call in a transaction context.
- * @param {*} dbCtx
+ * @param {*} dbCtx db context
* @param {Function} fn fn(txCtx)
*/
async transaction(dbCtx, fn) {
/**
* Store an authentication success event.
- * @param {*} dbCtx
- * @param {String} identifier
+ * @param {*} dbCtx db context
+ * @param {string} identifier authentication identifier
*/
async authenticationSuccess(dbCtx, identifier) {
this._notImplemented('authenticationSuccess', arguments);
/**
* Fetch authentication data for identifier.
- * @param {*} dbCtx
- * @param {*} identifier
+ * @param {*} dbCtx db context
+ * @param {*} identifier authentication identifier
*/
async authenticationGet(dbCtx, identifier) {
this._notImplemented('authenticationGet', arguments);
/**
* Create or update an authentication entity.
- * @param {*} dbCtx
- * @param {String} identifier
- * @param {String} credential
+ * @param {*} dbCtx db context
+ * @param {string} identifier authentication identifier
+ * @param {string} credential authentication credential
+ * @param {string=} otpKey authentication otp key
*/
- async authenticationUpsert(dbCtx, identifier, credential) {
+ async authenticationUpsert(dbCtx, identifier, credential, otpKey) {
this._notImplemented('authenticationUpsert', arguments);
}
+ /**
+ * Update an authentication entity's otp key.
+ * @param {*} dbCtx db context
+ * @param {string} identifier authentication identifier
+ * @param {string=} otpKey authentication otp key
+ */
+ async authenticationUpdateOTPKey(dbCtx, identifier, otpKey) {
+ this._notImplemented('authenticationUpdateKey', arguments);
+ }
+
+
+ /**
+ * Update an authentication entity's credential.
+ * @param {*} dbCtx db context
+ * @param {string} identifier authentication identifier
+ * @param {string} credential authentication credential
+ */
+ async authenticationUpdateCredential(dbCtx, identifier, credential) {
+ this._notImplemented('authenticationUpdateKey', arguments);
+ }
+
+
/**
* All subscriptions to a topic.
- * @param {*} dbCtx
- * @param {String} topicId
+ * @param {*} dbCtx db context
+ * @param {string} topicId topic id
*/
async subscriptionsByTopicId(dbCtx, topicId) {
this._notImplemented('subscriptionsByTopicId', arguments);
/**
* Number of subscriptions to a topic.
- * @param {*} dbCtx
- * @param {String} topicUrl
+ * @param {*} dbCtx db context
+ * @param {string} topicUrl topic url
*/
async subscriptionCountByTopicUrl(dbCtx, topicUrl) {
this._notImplemented('subscriptionCountByTopicUrl', arguments);
/**
* Remove an existing subscription.
- * @param {*} dbCtx
- * @param {String} callback
- * @param {*} topicId
+ * @param {*} dbCtx db context
+ * @param {string} callback subscriber callback url
+ * @param {*} topicId topic id
*/
async subscriptionDelete(dbCtx, callback, topicId) {
this._notImplemented('subscriptionDelete', arguments);
}
+ /**
+ * Remove any expired subscriptions to a topic.
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
+ */
+ async subscriptionDeleteExpired(dbCtx, topicId) {
+ this._notImplemented('subscriptionDeleteExpired', arguments);
+ }
+
+
+ /**
+ * @alias {number} Integer
+ */
/**
* Claim subscriptions needing content updates attempted.
- * @param {*} dbCtx
- * @param {Number} wanted maximum subscription updates to claim
+ * @param {*} dbCtx db context
+ * @param {number} wanted maximum subscription updates to claim
* @param {Integer} claimTimeoutSeconds age of claimed updates to reclaim
- * @param {String} claimant
+ * @param {string} claimant worker claiming processing
* @returns {Array} list of subscriptions
*/
async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
/**
* Claim a subscription delivery.
- * @param {*} dbCtx
- * @param {*} subscriptionId
- * @param {*} claimTimeoutSeconds
- * @param {*} claimant
+ * @param {*} dbCtx db context
+ * @param {*} subscriptionId subscription id
+ * @param {number} claimTimeoutSeconds duration of claim
+ * @param {*} claimant worker claiming processing
*/
async subscriptionDeliveryClaimById(dbCtx, subscriptionId, claimTimeoutSeconds, claimant) {
this._notImplemented('subscriptionDeliveryClaimById', arguments);
/**
* A subscriber successfully received new topic content, update subscription.
- * @param {*} dbCtx
- * @param {String} callback
- * @param {*} topicId
+ * @param {*} dbCtx db context
+ * @param {string} callback subscriber callback url
+ * @param {*} topicId topic id
*/
async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
this._notImplemented('subscriptionDeliveryComplete', arguments);
/**
* A subscriber denied new topic content, remove subscription.
- * @param {*} dbCtx
- * @param {String} callback
- * @param {*} topicId
+ * @param {*} dbCtx db context
+ * @param {string} callback subscriber callback url
+ * @param {*} topicId topic id
*/
async subscriptionDeliveryGone(dbCtx, callback, topicId) {
this._notImplemented('subscriptionDeliveryGone', arguments);
/**
* An attempt to deliver content to a subscriber did not complete, update delivery accordingly.
- * @param {*} dbCtx
- * @param {String} callback
- * @param {*} topicId
- * @param {Number[]} retryDelays
+ * @param {*} dbCtx db context
+ * @param {string} callback subscriber callback url
+ * @param {*} topicId topic id
+ * @param {number[]} retryDelays list of retry delays
*/
async subscriptionDeliveryIncomplete(dbCtx, callback, topicId, retryDelays) {
this._notImplemented('subscriptionDeliveryIncomplete', arguments);
/**
* Fetch subscription details
- * @param {*} dbCtx
- * @param {String} callback
- * @param {*} topicId
+ * @param {*} dbCtx db context
+ * @param {string} callback subscriber callback url
+ * @param {*} topicId topic id
*/
async subscriptionGet(dbCtx, callback, topicId) {
this._notImplemented('subscriptionGet', arguments);
/**
* Fetch subscription details
- * @param {*} dbCtx
- * @param {*} subscriptionId
+ * @param {*} dbCtx db context
+ * @param {*} subscriptionId subscription id
*/
async subscriptionGetById(dbCtx, subscriptionId) {
this._notImplemented('subscriptionGetById', arguments);
/**
* Set subscription details
- * @param {*} dbCtx
- * @param {Object} data
- * @param {String} data.callback
- * @param {*} data.topicId
- * @param {Number} data.leaseSeconds
- * @param {String=} data.secret
- * @param {String=} data.httpRemoteAddr
- * @param {String=} data.httpFrom
+ * @param {*} dbCtx db context
+ * @param {object} data subscription data
+ * @param {string} data.callback subscriber callback url
+ * @param {*} data.topicId topic id
+ * @param {number} data.leaseSeconds lease seconds
+ * @param {string=} data.secret secret
+ * @param {string=} data.httpRemoteAddr subscriber info
+ * @param {string=} data.httpFrom subscriber info
*/
async subscriptionUpsert(dbCtx, data) {
this._notImplemented('subscriptionUpsert', arguments);
/**
* Set some subscription fields
- * @param {*} dbCtx
- * @param {Object} data
- * @param {*} data.subscriptionId
- * @param {String} data.signatureAlgorithm
+ * @param {*} dbCtx db context
+ * @param {object} data subscription data
+ * @param {*} data.subscriptionId subscription id
+ * @param {string} data.signatureAlgorithm signature algorithm
*/
async subscriptionUpdate(dbCtx, data) {
this._notImplemented('subscriptionUpdate', arguments);
/**
* Sets the isDeleted flag on a topic, and reset update time.
- * @param {*} txCtx
- * @param {*} topicId
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
*/
async topicDeleted(dbCtx, topicId) {
this._notImplemented('topicDeleted', arguments);
/**
* Claim topics to fetch updates for, from available.
- * @param {*} dbCtx
+ * @param {*} dbCtx db context
* @param {Integer} wanted maximum topic fetches to claim
* @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
- * @param {String} claimant node id claiming these fetches
+ * @param {string} claimant node id claiming these fetches
*/
async topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
this._notImplemented('topicFetchClaim', arguments);
/**
* Claim a topic to update.
- * @param {*} dbCtx
- * @param {*} topicId
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
* @param {Integer} claimTimeoutSeconds age of claimed topics to reclaim
- * @param {String} claimant node id claiming these fetches
+ * @param {string} claimant node id claiming these fetches
*/
async topicFetchClaimById(dbCtx, topicId, claimTimeoutSeconds, claimant) {
this._notImplemented('topicFetchClaim', arguments);
/**
* Reset publish state, and reset deliveries for subscribers.
- * @param {*} dbCtx
- * @param {*} topicId
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
*/
async topicFetchComplete(dbCtx, topicId) {
this._notImplemented('topicFetchComplete', arguments);
/**
* Bump count of attempts and release claim on update.
- * @param {*} dbCtx
- * @param {*} topicId
- * @param {Number[]} retryDelays
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
+ * @param {number[]} retryDelays retry delays
*/
async topicFetchIncomplete(dbCtx, topicId, retryDelays) {
this._notImplemented('topicFetchIncomplete', arguments);
/**
* Set a topic as ready to be checked for an update.
- * @param {*} dbCtx
- * @param {*} topicId
- * @returns {Boolean}
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
*/
async topicFetchRequested(dbCtx, topicId) {
this._notImplemented('topicPublish', arguments);
/**
* Get all data for all topics, including subscription count.
- * @param {*} dbCtx
+ * @param {*} dbCtx db context
*/
async topicGetAll(dbCtx) {
this._notImplemented('topicGetAll', arguments);
}
+
/**
* Get topic data, without content.
- * @param {*} dbCtx
- * @param {String} topicUrl
+ * @param {*} dbCtx db context
+ * @param {string} topicUrl topic url
+ * @param {boolean} applyDefaults merge defaults into result
*/
- async topicGetByUrl(dbCtx, topicUrl) {
+ async topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
this._notImplemented('topicGetByUrl', arguments);
}
/**
* Get topic data, without content.
- * @param {*} dbCtx
- * @param {*} topicId
- * @param {Boolean} applyDefaults
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
+ * @param {boolean} applyDefaults merge defaults into result
*/
async topicGetById(dbCtx, topicId, applyDefaults = true) {
this._notImplemented('topicGetById', arguments);
/**
* Returns topic data with content.
- * @param {*} dbCx
- * @param {*} topicId
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
*/
- async topicGetContentById(dbCx, topicId) {
+ async topicGetContentById(dbCtx, topicId) {
this._notImplemented('topicGetContentById', arguments);
}
- // /**
- // * Call after an unsubscribe, to check if a topic is awaiting deletion, and that
- // * was the last subscription belaying it.
- // * @param {String|Integer} data topic url or id
- // */
- // async topicPendingDelete(dbCtx, data) {
- // this._notImplemented('topicPendingDelete', arguments);
- // }
+ /**
+ * Attempt to delete a topic, which must be set isDeleted, if there
+ * are no more subscriptions belaying its removal.
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
+ */
+ async topicPendingDelete(dbCtx, topicId) {
+ this._notImplemented('topicPendingDelete', arguments);
+ }
+
+
+ /**
+ * Return an array of the counts of the last #days of topic updates.
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
+ * @param {number} days days back to count
+ * @returns {number[]} updates in last days
+ */
+ async topicPublishHistory(dbCtx, topicId, days) {
+ this._notImplemented('topicPublishHistory', arguments);
+ }
+
+ /**
+ * @alias {object} TopicData
+ */
/**
* Create or update the basic parameters of a topic.
- * @param {*} dbCtx
- * @param {TopicData} data
+ * @param {*} dbCtx db context
+ * @param {TopicData} data topic data
*/
async topicSet(dbCtx, data) {
this._notImplemented('topicSet', arguments);
/**
* Updates a topic's content data and content update timestamp.
- * @param {Object} data
- * @param {Integer} data.topicId
- * @param {String} data.content
- * @param {String} data.contentHash
- * @param {String=} data.contentType
+ * @param {*} dbCtx db context
+ * @param {object} data topic data
+ * @param {*} data.topicId topic id
+ * @param {string} data.content content
+ * @param {string} data.contentHash content hash
+ * @param {string=} data.contentType content-type
+ * @param {string=} data.eTag etag header
+ * @param {string=} data.lastModified last modified header
*/
async topicSetContent(dbCtx, data) {
this._notImplemented('topicSetContent', arguments);
/**
* Set some topic fields.
- * @param {*} dbCtx
- * @param {Object} data
- * @param {*} data.topicId
- * @param {Number=} data.leaseSecondsPreferred
- * @param {Number=} data.leaseSecondsMin
- * @param {Number=} data.leaseSecondsMax
- * @param {String=} data.publisherValidationUrl
- * @param {String=} data.contentHashAlgorithm
+ * @param {*} dbCtx db context
+ * @param {object} data topic data
+ * @param {*} data.topicId topic id
+ * @param {number=} data.leaseSecondsPreferred preferred topic lease seconds
+ * @param {number=} data.leaseSecondsMin min lease seconds
+ * @param {number=} data.leaseSecondsMax max lease seconds
+ * @param {string=} data.publisherValidationUrl publisher validation url
+ * @param {string=} data.contentHashAlgorithm content hash algorithm
*/
async topicUpdate(dbCtx, data) {
this._notImplemented('topicUpdate', arguments);
}
+ /**
+ * @alias {object} Verification
+ */
/**
* Claim pending verifications for attempted resolution.
- * @param {*} dbCtx
+ * @param {*} dbCtx db context
* @param {Integer} wanted maximum verifications to claim
* @param {Integer} claimTimeoutSeconds age of claimed verifications to reclaim
+ * @param {*} claimant worker claiming processing
* @returns {Verification[]} array of claimed verifications
*/
async verificationClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
/**
* Claim a specific verification by id, if no other similar verification claimed.
- * @param {*} dbCtx
- * @param {*} verificationId
- * @param {Number} claimTimeoutSeconds
- * @param {String} claimant
+ * @param {*} dbCtx db context
+ * @param {*} verificationId verification id
+ * @param {number} claimTimeoutSeconds claim duration
+ * @param {string} claimant worker claiming processing
*/
async verificationClaimById(dbCtx, verificationId, claimTimeoutSeconds, claimant) {
this._notImplemented('verificationClaimById', arguments);
/**
- * Remove the verification, any older
- * verifications for that same client/topic, and the claim.
- * @param {*} dbCtx
- * @param {*} verificationId
- * @param {String} callback
- * @param {*} topicId
+ * Remove the verification, any older verifications for that same client/topic,
+ * and remove the claim.
+ * @param {*} dbCtx db context
+ * @param {*} verificationId verification id
+ * @param {string} callback subscriber callback url
+ * @param {*} topicId topic id
*/
async verificationComplete(dbCtx, verificationId, callback, topicId) {
this._notImplemented('verificationComplete', arguments);
/**
* Get verification data.
- * @param {*} dbCtx
- * @param {*} verificationId
+ * @param {*} dbCtx db context
+ * @param {*} verificationId verification id
*/
async verificationGetById(dbCtx, verificationId) {
this._notImplemented('verificationGetById', arguments);
/**
* Update database that a client verification was unable to complete.
* This releases the delivery claim and reschedules for some future time.
- * @param {*} dbCtx
- * @param {String} callback client callback url
- * @param {*} topicId internal topic id
- * @param {Number[]} retryDelays
+ * @param {*} dbCtx db context
+ * @param {*} verificationId verification id
+ * @param {number[]} retryDelays retry delays
*/
async verificationIncomplete(dbCtx, verificationId, retryDelays) {
this._notImplemented('verificationIncomplete', arguments);
}
+ /**
+ * @alias {object} VerificationData
+ */
/**
* Create a new pending verification.
- * @param {*} dbCtx
- * @param {VerificationData} data
- * @param {Boolean} claim
+ * @param {*} dbCtx db context
+ * @param {VerificationData} verification verification data
* @returns {*} verificationId
*/
async verificationInsert(dbCtx, verification) {
/**
* Relinquish the claim on a verification, without any other updates.
- * @param {*} dbCtx
- * @param {String} callback client callback url
- * @param {*} topicId internal topic id
+ * @param {*} dbCtx db context
+ * @param {*} verificationId verification id
*/
async verificationRelease(dbCtx, verificationId) {
this._notImplemented('verificationRelease', arguments);
/**
* Updates some fields of an existing (presumably claimed) verification.
- * @param {*} dbCtx
- * @param {*} verificationId
- * @param {Object} data
- * @param {String} data.mode
- * @param {String} data.reason
- * @param {Boolean} data.isPublisherValidated
+ * @param {*} dbCtx db context
+ * @param {*} verificationId verification id
+ * @param {object} data verification data
+ * @param {string} data.mode mode
+ * @param {string} data.reason reason
+ * @param {boolean} data.isPublisherValidated publisher validation result
*/
async verificationUpdate(dbCtx, verificationId, data) {
this._notImplemented('verificationUpdate', arguments);
/**
* Sets the isPublisherValidated flag on a verification and resets the delivery
- * @param {*} dbCtx
- * @param {*} verificationId
+ * @param {*} dbCtx db context
+ * @param {*} verificationId verification id
*/
async verificationValidated(dbCtx, verificationId) {
this._notImplemented('verificationValidated', arguments);