max: {
major: 1,
minor: 0,
- patch: 0,
+ patch: 1,
},
};
}
+ /**
+ * Converts engine subscription fields to native types.
+ * @param {Object} data
+ */
+ static _subscriptionDataToNative(data) {
+ const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
+ if (data) {
+ ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
+ // eslint-disable-next-line security/detect-object-injection
+ data[field] = epochToDate(data[field]);
+ });
+ }
+ return data;
+ }
+
+
subscriptionsByTopicId(dbCtx, topicId) {
const _scope = _fileScope('subscriptionsByTopicId');
this.logger.debug(_scope, 'called', { topicId });
try {
- return this.statement.subscriptionsByTopicId.all({ topicId });
+ const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
+ return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, topicId });
throw e;
}
+ subscriptionDeleteExpired(dbCtx, topicId) {
+ const _scope = _fileScope('subscriptionDeleteExpired');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ const result = this.statement.subscriptionDeleteExpired.run({ topicId });
+ this.logger.debug(_scope, 'success', { topicId, deleted: result.changes });
+ return this._engineInfo(result);
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
const _scope = _fileScope('subscriptionDeliveryClaim');
this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
let subscription;
try {
subscription = this.statement.subscriptionGet.get({ callback, topicId });
- return subscription;
+ return DatabaseSQLite._subscriptionDataToNative(subscription);
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, callback, topicId });
throw e;
let subscription;
try {
subscription = this.statement.subscriptionGetById.get({ subscriptionId });
- return subscription;
+ return DatabaseSQLite._subscriptionDataToNative(subscription);
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, subscriptionId });
throw e;
}
+ topicPendingDelete(dbCtx, topicId) {
+ const _scope = _fileScope('topicPendingDelete');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ this.db.transaction(() => {
+ const topic = this.statement.topicGetById.get({ topicId });
+ if (!topic.isDeleted) {
+ this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+ return;
+ }
+
+ const { count: subscriberCount } = this.statement.subscriptionCountByTopicUrl.get({ topicUrl: topic.url });
+ if (subscriberCount) {
+ this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+ return;
+ }
+
+ const result = this.statement.topicDeleteById.run({ topicId });
+ if (result.changes !== 1) {
+ throw new DBErrors.UnexpectedResult('did not delete topic');
+ }
+ })();
+ this.logger.debug(_scope, 'success', { topicId });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
topicSet(dbCtx, data) {
const _scope = _fileScope('topicSet');
this.logger.debug(_scope, 'called', data);