max: {
major: 1,
minor: 0,
- patch: 1,
+ patch: 3,
},
};
this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
for (const v of migrationsWanted) {
const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
- const migrationSql = _queryFile(fPath);
- const results = await this.db.multiResult(migrationSql);
- this.logger.debug(_scope, 'executed migration sql', { version: v, results });
- this.logger.info(_scope, 'applied migration', { version: v });
+ try {
+ const migrationSql = _queryFile(fPath);
+ this.logger.debug(_scope, 'applying migration', { version: v });
+ const results = await this.db.multiResult(migrationSql);
+ this.logger.debug(_scope, 'migration results', { results });
+ this.logger.info(_scope, 'applied migration', { version: v });
+ } catch (e) {
+ this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+ throw e;
+ }
}
}
}
+ async subscriptionDeleteExpired(dbCtx, topicId) {
+ const _scope = _fileScope('subscriptionDeleteExpired');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
+ this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
+ return this._engineInfo(result);
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
const _scope = _fileScope('subscriptionDeliveryClaim');
this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
}
- async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+ async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
const _scope = _fileScope('subscriptionDeliveryComplete');
- this.logger.debug(_scope, 'called', { callback, topicId });
+ this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
let result;
try {
await dbCtx.txIf(async (txCtx) => {
- result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
+ result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
if (result.rowCount != 1) {
throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
}
}
});
} catch (e) {
- this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+ this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
throw e;
}
}
}
+ async topicPendingDelete(dbCtx, topicId) {
+ const _scope = _fileScope('topicPendingDelete');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ await dbCtx.txIf(async (txCtx) => {
+ const topic = await txCtx.one(this.statement.topicGetById, { topicId });
+ if (!topic.isDeleted) {
+ this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+ return;
+ }
+
+ const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
+ if (subscriberCount) {
+ this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+ return;
+ }
+
+ const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
+ if (result.rowCount !== 1) {
+ throw new DBErrors.UnexpectedResult('did not delete topic');
+ }
+ });
+ this.logger.debug(_scope, 'success', { topicId });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
async topicSet(dbCtx, data) {
const _scope = _fileScope('topicSet');
this.logger.debug(_scope, 'called', data);
if (result.rowCount != 1) {
throw new DBErrors.UnexpectedResult('did not set topic content');
}
+ result = await dbCtx.result(this.statement.topicSetContentHistory, { topicId: data.topicId, contentHash: data.contentHash, contentSize: data.content.length });
+ if (result.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not set topic content history');
+ }
this.logger.debug(_scope, 'success', { ...logData });
return this._engineInfo(result);
} catch (e) {