X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=src%2Fdb%2Fpostgres%2Findex.js;h=3e97336992999adaee4a077c1f7ef9ddc7719a87;hb=737fbd003d5c4dfea81b667ef906f1c106a60612;hp=213fa1031c3c44fd29c48a46a8af86ff8bed6217;hpb=9812213260e952ae601f94ab0915c680e8c80495;p=websub-hub diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index 213fa10..3e97336 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -30,7 +30,7 @@ const schemaVersionsSupported = { max: { major: 1, minor: 0, - patch: 1, + patch: 3, }, }; @@ -162,10 +162,16 @@ class DatabasePostgres extends Database { this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted }); for (const v of migrationsWanted) { const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql'); - const migrationSql = _queryFile(fPath); - const results = await this.db.multiResult(migrationSql); - this.logger.debug(_scope, 'executed migration sql', { version: v, results }); - this.logger.info(_scope, 'applied migration', { version: v }); + try { + const migrationSql = _queryFile(fPath); + this.logger.debug(_scope, 'applying migration', { version: v }); + const results = await this.db.multiResult(migrationSql); + this.logger.debug(_scope, 'migration results', { results }); + this.logger.info(_scope, 'applied migration', { version: v }); + } catch (e) { + this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v }); + throw e; + } } } @@ -473,14 +479,14 @@ class DatabasePostgres extends Database { } - async subscriptionDeliveryComplete(dbCtx, callback, topicId) { + async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) { const _scope = _fileScope('subscriptionDeliveryComplete'); - this.logger.debug(_scope, 'called', { callback, topicId }); + this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated }); let result; try { await dbCtx.txIf(async (txCtx) => { - result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId }); + result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated }); if (result.rowCount != 1) { throw new DBErrors.UnexpectedResult('did not set subscription delivery success'); } @@ -490,7 +496,7 @@ class DatabasePostgres extends Database { } }); } catch (e) { - this.logger.error(_scope, 'failed', { error: e, callback, topicId }); + this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated }); throw e; } } @@ -850,6 +856,18 @@ class DatabasePostgres extends Database { } + async topicPublishHistory(dbCtx, topicId, days) { + const _scope = _fileScope('topicPublishHistory'); + this.logger.debug(_scope, 'called', { topicId, days }); + + const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days }); + const history = Array.from({ length: days }, () => 0); + events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = contentUpdates); + + return history; + } + + async topicSet(dbCtx, data) { const _scope = _fileScope('topicSet'); this.logger.debug(_scope, 'called', data); @@ -898,6 +916,10 @@ class DatabasePostgres extends Database { if (result.rowCount != 1) { throw new DBErrors.UnexpectedResult('did not set topic content'); } + result = await dbCtx.result(this.statement.topicSetContentHistory, { topicId: data.topicId, contentHash: data.contentHash, contentSize: data.content.length }); + if (result.rowCount != 1) { + throw new DBErrors.UnexpectedResult('did not set topic content history'); + } this.logger.debug(_scope, 'success', { ...logData }); return this._engineInfo(result); } catch (e) {