const svh = require('../schema-version-helper');
const Database = require('../base');
const DBErrors = require('../errors');
+const Listener = require('./listener');
const common = require('../../common');
const _fileScope = common.fileScope(__filename);
},
max: {
major: 1,
- minor: 0,
+ minor: 1,
patch: 0,
},
};
// Suppress QF warnings when running tests
this.noWarnings = options.db.noWarnings;
+ if (options.db.cacheEnabled) {
+ this.listener = new Listener(logger, this.db, {
+ ...options.db.listener,
+ channel: 'topic_changed',
+ dataCallback: this._topicChanged.bind(this),
+ connectionEstablishedCallback: this._listenerEstablished.bind(this),
+ connectionLostCallback: this._listenerLost.bind(this),
+ });
+ }
+
// Log queries
const queryLogLevel = options.db.queryLogLevel;
if (queryLogLevel) {
pgpInitOptions.query = (event) => {
- this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
+ // Quell outgoing pings
+ if (event?.query?.startsWith('NOTIFY')) {
+ return;
+ }
+ this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event || {}, ['query', 'params']) });
};
}
// Log errors
pgpInitOptions.error = (err, event) => {
this.logger.error(_fileScope('pgp:error'), '', { err, event });
+
+ // TODO: close connection on err.code === '57P03' database shutting down
};
// Deophidiate column names in-place, log results
- pgpInitOptions.receive = (data, result, event) => {
+ pgpInitOptions.receive = ({ data, result, ctx: event }) => {
const exemplaryRow = data[0];
for (const prop in exemplaryRow) {
const camel = Database._camelfy(prop);
}
}
if (queryLogLevel) {
+ // Quell outgoing pings
+ if (result && result.command === 'NOTIFY') {
+ return;
+ }
// Omitting .rows
- const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
+ const resultLog = common.pick(result || {}, ['command', 'rowCount', 'duration']);
this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
}
};
_queryFileHelper(_pgp) {
return (file) => {
const _scope = _fileScope('_queryFile');
+ /* istanbul ignore next */
const qfParams = {
minify: true,
...(this.noWarnings && { noWarnings: this.noWarnings }),
await this._initTables();
}
await super.initialize();
+ if (this.listener) {
+ await this.listener.start();
+ }
}
this.logger.debug(_scope, 'schema migrations wanted', { migrationsWanted });
for (const v of migrationsWanted) {
const fPath = path.join(__dirname, 'sql', 'schema', v, 'apply.sql');
- const migrationSql = _queryFile(fPath);
- const results = await this.db.multiResult(migrationSql);
- this.logger.debug(_scope, 'executed migration sql', { version: v, results });
- this.logger.info(_scope, 'applied migration', { version: v });
+ try {
+ const migrationSql = _queryFile(fPath);
+ this.logger.debug(_scope, 'applying migration', { version: v });
+ const results = await this.db.multiResult(migrationSql);
+ this.logger.debug(_scope, 'migration results', { results });
+ this.logger.info(_scope, 'applied migration', { version: v });
+ } catch (e) {
+ this.logger.error(_scope, 'migration failed', { error: e, fPath, version: v });
+ throw e;
+ }
}
}
async _closeConnection() {
const _scope = _fileScope('_closeConnection');
try {
+ if (this.listener) {
+ await this.listener.stop();
+ }
await this._pgp.end();
} catch (e) {
this.logger.error(_scope, 'failed', { error: e });
}
+ /* istanbul ignore next */
async _purgeTables(really = false) {
const _scope = _fileScope('_purgeTables');
try {
}
+ /**
+ * Receive notices when topic entry is updated.
+ * Clear relevant cache entry.
+ * @param {string} payload topic changed event
+ */
+ _topicChanged(payload) {
+ const _scope = _fileScope('_topicChanged');
+ if (payload !== 'ping') {
+ this.logger.debug(_scope, 'called', { payload });
+ this.cache.delete(payload);
+ }
+ }
+
+
+ /**
+ * Called when a listener connection is opened.
+ * Enable cache.
+ */
+ _listenerEstablished() {
+ const _scope = _fileScope('_listenerEstablished');
+ this.logger.debug(_scope, 'called', {});
+ this.cache = new Map();
+ }
+
+
+ /**
+ * Called when a listener connection is closed.
+ * Disable cache.
+ */
+ _listenerLost() {
+ const _scope = _fileScope('_listenerLost');
+ this.logger.debug(_scope, 'called', {});
+ delete this.cache;
+ }
+
+
+ /**
+ * Return a cached entry, if available.
+ * @param {*} key key
+ * @returns {object=} cached data
+ */
+ _cacheGet(key) {
+ const _scope = _fileScope('_cacheGet');
+ if (this.cache?.has(key)) {
+ const cacheEntry = this.cache.get(key);
+ this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
+ cacheEntry.hits += 1;
+ cacheEntry.lastHit = new Date();
+ return cacheEntry.data;
+ }
+ }
+
+
+ /**
+ * Store an entry in cache, if available.
+ * @param {*} key key
+ * @param {*} data data
+ */
+ _cacheSet(key, data) {
+ const _scope = _fileScope('_cacheSet');
+ if (this.cache) {
+ this.cache.set(key, {
+ added: new Date(),
+ hits: 0,
+ lastHit: undefined,
+ data,
+ });
+ this.logger.debug(_scope, 'added cache entry', { key });
+ }
+ }
+
+
async context(fn) {
return this.db.task(async (t) => fn(t));
}
}
- async authenticationUpsert(dbCtx, identifier, credential) {
+ async authenticationUpsert(dbCtx, identifier, credential, otpKey) {
const _scope = _fileScope('authenticationUpsert');
const scrubbedCredential = '*'.repeat((credential || '').length);
- this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
+ const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null;
+ this.logger.debug(_scope, 'called', { identifier, scrubbedCredential, scrubbedOTPKey });
let result;
try {
- result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential });
+ result = await dbCtx.result(this.statement.authenticationUpsert, { identifier, credential, otpKey });
if (result.rowCount != 1) {
throw new DBErrors.UnexpectedResult('did not upsert authentication');
}
} catch (e) {
- this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential })
+ this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential, scrubbedOTPKey });
+ throw e;
+ }
+ }
+
+
+ async authenticationUpdateCredential(dbCtx, identifier, credential) {
+ const _scope = _fileScope('authenticationUpdateCredential');
+ const scrubbedCredential = '*'.repeat((credential || '').length);
+ this.logger.debug(_scope, 'called', { identifier, scrubbedCredential });
+
+ let result;
+ try {
+ result = await dbCtx.result(this.statement.authenticationUpdateCredential, { identifier, credential });
+ if (result.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not update authentication credential');
+ }
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedCredential });
+ throw e;
+ }
+ }
+
+
+ async authenticationUpdateOTPKey(dbCtx, identifier, otpKey) {
+ const _scope = _fileScope('authenticationUpdateOTPKey');
+ const scrubbedOTPKey = '*'.repeat((otpKey || '').length) || null;
+ this.logger.debug(_scope, 'called', { identifier, scrubbedOTPKey });
+
+ let result;
+ try {
+ result = await dbCtx.result(this.statement.authenticationUpdateOtpKey, { identifier, otpKey });
+ if (result.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not update authentication otp key');
+ }
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, identifier, scrubbedOTPKey });
throw e;
}
}
}
+ async subscriptionDeleteExpired(dbCtx, topicId) {
+ const _scope = _fileScope('subscriptionDeleteExpired');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ const result = await dbCtx.result(this.statement.subscriptionDeleteExpired, { topicId });
+ this.logger.debug(_scope, 'success', { topicId, deleted: result.rowCount });
+ return this._engineInfo(result);
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
async subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, claimant) {
const _scope = _fileScope('subscriptionDeliveryClaim');
this.logger.debug(_scope, 'called', { wanted, claimTimeoutSeconds, claimant });
}
- async subscriptionDeliveryComplete(dbCtx, callback, topicId) {
+ async subscriptionDeliveryComplete(dbCtx, callback, topicId, topicContentUpdated) {
const _scope = _fileScope('subscriptionDeliveryComplete');
- this.logger.debug(_scope, 'called', { callback, topicId });
+ this.logger.debug(_scope, 'called', { callback, topicId, topicContentUpdated });
let result;
try {
await dbCtx.txIf(async (txCtx) => {
- result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId });
+ result = await txCtx.result(this.statement.subscriptionDeliverySuccess, { callback, topicId, topicContentUpdated });
if (result.rowCount != 1) {
throw new DBErrors.UnexpectedResult('did not set subscription delivery success');
}
}
});
} catch (e) {
- this.logger.error(_scope, 'failed', { error: e, callback, topicId });
+ this.logger.error(_scope, 'failed', { error: e, callback, topicId, topicContentUpdated });
throw e;
}
}
let topics;
try {
topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
- } catch (e) {
+ } catch (e) {
this.logger.error(_scope, 'failed', { error: e, topics });
throw e;
}
}
- async topicGetByUrl(dbCtx, topicUrl) {
+ async topicGetByUrl(dbCtx, topicUrl, applyDefaults = true) {
const _scope = _fileScope('topicGetByUrl');
this.logger.debug(_scope, 'called', { topicUrl });
let topic;
try {
topic = await dbCtx.oneOrNone(this.statement.topicGetByUrl, { topicUrl });
- return this._topicDefaults(topic);
+ if (applyDefaults) {
+ topic = this._topicDefaults(topic);
+ }
+ return topic;
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, topic, topicUrl });
throw e;
let topic;
try {
+ topic = this._cacheGet(topicId);
+ if (topic) {
+ return topic;
+ }
topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
- return this._topicDefaults(topic);
+ const topicWithDefaults = this._topicDefaults(topic);
+ this._cacheSet(topicId, topicWithDefaults);
+ return topicWithDefaults;
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, topic, topicId });
throw e;
}
+ async topicPendingDelete(dbCtx, topicId) {
+ const _scope = _fileScope('topicPendingDelete');
+ this.logger.debug(_scope, 'called', { topicId });
+
+ try {
+ await dbCtx.txIf(async (txCtx) => {
+ const topic = await txCtx.one(this.statement.topicGetById, { topicId });
+ if (!topic.isDeleted) {
+ this.logger.debug(_scope, 'topic not set deleted, not deleting', { topicId });
+ return;
+ }
+
+ const { count: subscriberCount } = await txCtx.one(this.statement.subscriptionCountByTopicUrl, { topicUrl: topic.url });
+ if (subscriberCount) {
+ this.logger.debug(_scope, 'topic has subscribers, not deleting', { topicId, subscriberCount });
+ return;
+ }
+
+ const result = await txCtx.result(this.statement.topicDeleteById, { topicId });
+ if (result.rowCount !== 1) {
+ throw new DBErrors.UnexpectedResult('did not delete topic');
+ }
+ });
+ this.logger.debug(_scope, 'success', { topicId });
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e, topicId });
+ throw e;
+ }
+ }
+
+
+ async topicPublishHistory(dbCtx, topicId, days) {
+ const _scope = _fileScope('topicPublishHistory');
+ this.logger.debug(_scope, 'called', { topicId, days });
+
+ const events = await dbCtx.manyOrNone(this.statement.topicPublishHistory, { topicIds: [topicId], daysAgo: days });
+ const history = Array.from({ length: days }, () => 0);
+ events.forEach(({ daysAgo, contentUpdates }) => history[daysAgo] = Number(contentUpdates));
+
+ return history;
+ }
+
+
async topicSet(dbCtx, data) {
const _scope = _fileScope('topicSet');
this.logger.debug(_scope, 'called', data);
const _scope = _fileScope('topicSetContent');
const topicSetContentData = {
contentType: null,
+ httpETag: null,
+ httpLastModified: null,
...data,
};
const logData = {
if (result.rowCount != 1) {
throw new DBErrors.UnexpectedResult('did not set topic content');
}
+ result = await dbCtx.result(this.statement.topicSetContentHistory, {
+ topicId: data.topicId,
+ contentHash: data.contentHash,
+ contentSize: data.content.length,
+ });
+ if (result.rowCount != 1) {
+ throw new DBErrors.UnexpectedResult('did not set topic content history');
+ }
this.logger.debug(_scope, 'success', { ...logData });
return this._engineInfo(result);
} catch (e) {