From: Justin Wind Date: Mon, 9 Aug 2021 16:54:43 +0000 (-0700) Subject: Merge branch 'v1.1-dev' as v1.1.0 X-Git-Tag: v1.1.0 X-Git-Url: https://git.squeep.com/?a=commitdiff_plain;h=38aba0869dc3ade99d439e74cbc6239b4fa1f632;hp=9696c012e6b9a6c58904baa397ca0ebf78112316;p=websub-hub Merge branch 'v1.1-dev' as v1.1.0 --- diff --git a/.eslintrc.json b/.eslintrc.json index 1a60b64..e6c6f89 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -55,6 +55,13 @@ "error", "last" ], + "indent": [ + "warn", + 2, + { + "SwitchCase": 1 + } + ], "sonarjs/cognitive-complexity": "warn", "sonarjs/no-duplicate-string": "warn", "keyword-spacing": "error", diff --git a/.markdownlint.json b/.markdownlint.json new file mode 100644 index 0000000..18e3c71 --- /dev/null +++ b/.markdownlint.json @@ -0,0 +1,4 @@ +{ + "MD013": false, + "MD024": false +} diff --git a/CHANGELOG.md b/CHANGELOG.md index d299b62..e12d6d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,13 +4,21 @@ Releases and notable changes to this project are documented here. ## [Unreleased] +## [v1.1.0] - 2021-08-08 + +### Added + +- Caching of topic contents for Postfix database backends. This should greatly reduce the db load when many subscribers to a topic are delivered an update. +- Minor cleanup to generated HTML pages. + ## [v1.0.0] - 2021-08-01 ### Added -Everything. MVP first stable release. +- Everything. MVP first stable release. --- -[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.0.0 +[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.1.0 +[v1.1.0]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.0;hp=v1.0.0 [v1.0.0]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.0.0;hp=v0.0.0 \ No newline at end of file diff --git a/bin/authUserAdd.js b/bin/authUserAdd.js index 4eaff65..1c33be8 100644 --- a/bin/authUserAdd.js +++ b/bin/authUserAdd.js @@ -44,6 +44,7 @@ async function readPassword(prompt) { } (async () => { + await db.initialize(); const password = await readPassword('password: '); const credential = await argon2.hash(password, { type: argon2.argon2id }); console.log(`\t${identifier}:${credential}`); diff --git a/config/default.js b/config/default.js index 5c7091f..1905315 100644 --- a/config/default.js +++ b/config/default.js @@ -22,6 +22,12 @@ const defaultOptions = { db: { connectionString: '', // e.g. sqlite://path/to/dbfile.sqlite queryLogLevel: undefined, // Set to log queries + cacheEnabled: true, // Cache some db responses. (Postgres only) + listener: { // Settings for the cache-invalidator connection. (Postgres only) + // pingDelayMs: 5000, // Connection keep-alive/health-check. + // reconnectDelayMs: 6000, // Wait time before attempting reconnection. + // reconnectTimes: 10, // Retries limit. + }, }, // Logging options diff --git a/config/test.js b/config/test.js index 091b6c0..3ff1259 100644 --- a/config/test.js +++ b/config/test.js @@ -8,5 +8,6 @@ module.exports = { }, db: { queryLogLevel: 'debug', + cacheEnabled: false, }, }; diff --git a/package-lock.json b/package-lock.json index 410c0aa..401fffd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -572,18 +572,13 @@ "dev": true }, "@squeep/api-dingus": { - "version": "git+https://git.squeep.com/squeep-api-dingus/#16db6709ab8407b1f696e3d5f92aa6980f182f39", - "from": "git+https://git.squeep.com/squeep-api-dingus/#v1.0.0", + "version": "git+https://git.squeep.com/squeep-api-dingus/#12b96f53e7976b74296c1e024432b88749e6c4b0", + "from": "git+https://git.squeep.com/squeep-api-dingus/#v1.1-dev", "requires": { "mime-db": "^1.49.0", "uuid": "^8.3.2" }, "dependencies": { - "mime-db": { - "version": "1.49.0", - "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.49.0.tgz", - "integrity": "sha512-CIc8j9URtOVApSFCQIF+VBkX1RwXp/oMMOrqdyXSBXq5RWNEsRfyj1kiRnQgmNXmHxPoFIxOroKA3zcU9P+nAA==" - }, "uuid": { "version": "8.3.2", "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", @@ -2178,6 +2173,11 @@ } } }, + "mime-db": { + "version": "1.49.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.49.0.tgz", + "integrity": "sha512-CIc8j9URtOVApSFCQIF+VBkX1RwXp/oMMOrqdyXSBXq5RWNEsRfyj1kiRnQgmNXmHxPoFIxOroKA3zcU9P+nAA==" + }, "mimic-response": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/mimic-response/-/mimic-response-2.1.0.tgz", @@ -3438,9 +3438,9 @@ } }, "tar": { - "version": "6.1.0", - "resolved": "https://registry.npmjs.org/tar/-/tar-6.1.0.tgz", - "integrity": "sha512-DUCttfhsnLCjwoDoFcI+B2iJgYa93vBnDUATYEeRx6sntCTdN01VnqsIuTlALXla/LWooNg0yEGeB+Y8WdFxGA==", + "version": "6.1.6", + "resolved": "https://registry.npmjs.org/tar/-/tar-6.1.6.tgz", + "integrity": "sha512-oaWyu5dQbHaYcyZCTfyPpC+VmI62/OM2RTUYavTk1MDr1cwW5Boi3baeYQKiZbY2uSQJGr+iMOzb/JFxLrft+g==", "requires": { "chownr": "^2.0.0", "fs-minipass": "^2.0.0", diff --git a/package.json b/package.json index 3be524d..d488868 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "websub-hub", - "version": "1.0.0", + "version": "1.1.0", "description": "A WebSub Hub server implementation.", "main": "server.js", "scripts": { @@ -32,7 +32,7 @@ "coverage-check" ], "dependencies": { - "@squeep/api-dingus": "git+https://git.squeep.com/squeep-api-dingus/#v1.0.0", + "@squeep/api-dingus": "git+https://git.squeep.com/squeep-api-dingus/#v1.1.0", "@squeep/web-linking": "git+https://git.squeep.com/squeep-web-linking/#v1.0.0", "argon2": "^0.28.2", "axios": "^0.21.1", diff --git a/server.js b/server.js index 985e575..7658097 100644 --- a/server.js +++ b/server.js @@ -19,7 +19,7 @@ const ADDR = process.env.LISTEN_ADDR || '127.0.0.1'; config = new Config(process.env.NODE_ENV); logger = new Logger(config); db = new DB(logger, config); - await db.schemaCheck(); + await db.initialize(); service = new Service(logger, db, config); http.createServer((req, res) => { diff --git a/src/authenticator.js b/src/authenticator.js index ed4fc40..7053b56 100644 --- a/src/authenticator.js +++ b/src/authenticator.js @@ -103,7 +103,7 @@ class Authenticator { const authData = req.getHeader(Enum.Header.Authorization); if (authData && await this.isValidAuthorization(authData, ctx)) { - return true; + return true; } return this.requestBasic(res); } diff --git a/src/common.js b/src/common.js index b82b13e..0cacc3b 100644 --- a/src/common.js +++ b/src/common.js @@ -84,7 +84,7 @@ const topicLeaseDefaults = () => { * @param {Number} jitter * @returns {Number} */ - const attemptRetrySeconds = (attempt, retryBackoffSeconds = [60, 120, 360, 1440, 7200, 43200, 86400], jitter = 0.618) => { +const attemptRetrySeconds = (attempt, retryBackoffSeconds = [60, 120, 360, 1440, 7200, 43200, 86400], jitter = 0.618) => { const maxAttempt = retryBackoffSeconds.length - 1; if (typeof attempt !== 'number' || attempt < 0) { attempt = 0; @@ -103,7 +103,7 @@ const topicLeaseDefaults = () => { * @param {Array} array * @param {Number} per */ - const arrayChunk = (array, per = 1) => { +const arrayChunk = (array, per = 1) => { const nChunks = Math.ceil(array.length / per); return Array.from(Array(nChunks), (_, i) => array.slice(i * per, (i + 1) * per)); } @@ -114,7 +114,7 @@ const topicLeaseDefaults = () => { * @param {Array} dst * @param {Array} src */ - const stackSafePush = (dst, src) => { +const stackSafePush = (dst, src) => { const jsEngineMaxArguments = 2**16; // Current as of Node 12 arrayChunk(src, jsEngineMaxArguments).forEach((items) => { Array.prototype.push.apply(dst, items); diff --git a/src/db/base.js b/src/db/base.js index 8a1df74..21e2664 100644 --- a/src/db/base.js +++ b/src/db/base.js @@ -76,21 +76,21 @@ class Database { * @param {String} method * @param {arguments} args */ - _notImplemented(method, args) { + _notImplemented(method, args) { this.logger.error(_fileScope(method), 'abstract method called', Array.from(args)); throw new DBErrors.NotImplemented(method); } /** - * Validate schema compatibility. - * Ensure this is called immediately after instantiating a DB instance, - * as some engines also finish initialization and validation here, which - * was easier than wrangling async calls in constructor. - * In light of this behavior, this method could be named better. - */ - async schemaCheck() { - const _scope = _fileScope('schemaCheck'); + * Perform tasks needed to prepare database for use. Ensure this is called + * after construction, and before any other database activity. + * At the minimum, this will validate a compatible schema is present and usable. + * Some engines will also perform other initializations or async actions which + * are easier handled outside the constructor. + */ + async initialize() { + const _scope = _fileScope('initialize'); const currentSchema = await this._currentSchema(); const current = svh.schemaVersionObjectToNumber(currentSchema); @@ -416,7 +416,7 @@ class Database { * @param {String} callback * @param {*} topicId */ - async subscriptionGet(dbCtx, callback, topicId) { + async subscriptionGet(dbCtx, callback, topicId) { this._notImplemented('subscriptionGet', arguments); } @@ -442,7 +442,7 @@ class Database { * @param {String=} data.httpRemoteAddr * @param {String=} data.httpFrom */ - async subscriptionUpsert(dbCtx, data) { + async subscriptionUpsert(dbCtx, data) { this._notImplemented('subscriptionUpsert', arguments); } @@ -520,7 +520,7 @@ class Database { * @param {*} topicId * @returns {Boolean} */ - async topicFetchRequested(dbCtx, topicId) { + async topicFetchRequested(dbCtx, topicId) { this._notImplemented('topicPublish', arguments); } @@ -679,7 +679,7 @@ class Database { * @param {Boolean} claim * @returns {*} verificationId */ - async verificationInsert(dbCtx, verification) { + async verificationInsert(dbCtx, verification) { this._notImplemented('verificationInsert', arguments); } @@ -704,9 +704,9 @@ class Database { * @param {String} data.reason * @param {Boolean} data.isPublisherValidated */ - async verificationUpdate(dbCtx, verificationId, data) { + async verificationUpdate(dbCtx, verificationId, data) { this._notImplemented('verificationUpdate', arguments); - } + } /** diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index f4f690a..1c5d1d1 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -10,6 +10,7 @@ const pgp = require('pg-promise')(pgpInitOptions); const svh = require('../schema-version-helper'); const Database = require('../base'); const DBErrors = require('../errors'); +const Listener = require('./listener'); const common = require('../../common'); const _fileScope = common.fileScope(__filename); @@ -43,10 +44,23 @@ class DatabasePostgres extends Database { // Suppress QF warnings when running tests this.noWarnings = options.db.noWarnings; + if (options.db.cacheEnabled) { + this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, { + channel: 'topic_changed', + dataCallback: this._topicChanged.bind(this), + connectionEstablishedCallback: this._listenerEstablished.bind(this), + connectionLostCallback: this._listenerLost.bind(this), + })); + } + // Log queries const queryLogLevel = options.db.queryLogLevel; if (queryLogLevel) { pgpInitOptions.query = (event) => { + // Quell outgoing pings + if (event && event.query && event.query.startsWith('NOTIFY')) { + return; + } this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) }); }; } @@ -69,6 +83,10 @@ class DatabasePostgres extends Database { } } if (queryLogLevel) { + // Quell outgoing pings + if (result && result.command === 'NOTIFY') { + return; + } // Omitting .rows const resultLog = common.pick(result, ['command', 'rowCount', 'duration']); this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog }); @@ -86,6 +104,7 @@ class DatabasePostgres extends Database { _queryFileHelper(_pgp) { return (file) => { const _scope = _fileScope('_queryFile'); + /* istanbul ignore next */ const qfParams = { minify: true, ...(this.noWarnings && { noWarnings: this.noWarnings }), @@ -100,13 +119,16 @@ class DatabasePostgres extends Database { } - async schemaCheck(applyMigrations = true) { - const _scope = _fileScope('schemaCheck'); + async initialize(applyMigrations = true) { + const _scope = _fileScope('initialize'); this.logger.debug(_scope, 'called', { applyMigrations }); if (applyMigrations) { await this._initTables(); } - await super.schemaCheck(); + await super.initialize(); + if (this.listener) { + await this.listener.start(); + } } @@ -173,6 +195,9 @@ class DatabasePostgres extends Database { async _closeConnection() { const _scope = _fileScope('_closeConnection'); try { + if (this.listener) { + await this.listener.stop(); + } await this._pgp.end(); } catch (e) { this.logger.error(_scope, 'failed', { error: e }); @@ -181,6 +206,7 @@ class DatabasePostgres extends Database { } + /* istanbul ignore next */ async _purgeTables(really = false) { const _scope = _fileScope('_purgeTables'); try { @@ -219,6 +245,77 @@ class DatabasePostgres extends Database { } + /** + * Receive notices when topic entry is updated. + * Clear relevant cache entry. + * @param {String} payload + */ + _topicChanged(payload) { + const _scope = _fileScope('_topicChanged'); + if (payload !== 'ping') { + this.logger.debug(_scope, 'called', { payload }); + this.cache.delete(payload); + } + } + + + /** + * Called when a listener connection is opened. + * Enable cache. + */ + _listenerEstablished() { + const _scope = _fileScope('_listenerEstablished'); + this.logger.debug(_scope, 'called', {}); + this.cache = new Map(); + } + + + /** + * Called when a listener connection is closed. + * Disable cache. + */ + _listenerLost() { + const _scope = _fileScope('_listenerLost'); + this.logger.debug(_scope, 'called', {}); + delete this.cache; + } + + + /** + * Return a cached entry, if available. + * @param {*} key + */ + _cacheGet(key) { + const _scope = _fileScope('_cacheGet'); + if (this.cache && this.cache.has(key)) { + const cacheEntry = this.cache.get(key); + this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) }); + cacheEntry.hits += 1; + cacheEntry.lastHit = new Date(); + return cacheEntry.data; + } + } + + + /** + * Store an entry in cache, if available. + * @param {*} key + * @param {*} data + */ + _cacheSet(key, data) { + const _scope = _fileScope('_cacheSet'); + if (this.cache) { + this.cache.set(key, { + added: new Date(), + hits: 0, + lastHit: undefined, + data, + }); + this.logger.debug(_scope, 'added cache entry', { key }); + } + } + + async context(fn) { return this.db.task(async (t) => fn(t)); } @@ -642,7 +739,7 @@ class DatabasePostgres extends Database { let topics; try { topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll); - } catch (e) { + } catch (e) { this.logger.error(_scope, 'failed', { error: e, topics }); throw e; } @@ -692,8 +789,14 @@ class DatabasePostgres extends Database { let topic; try { + topic = this._cacheGet(topicId); + if (topic) { + return topic; + } topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId }); - return this._topicDefaults(topic); + const topicWithDefaults = this._topicDefaults(topic); + this._cacheSet(topicId, topicWithDefaults); + return topicWithDefaults; } catch (e) { this.logger.error(_scope, 'failed', { error: e, topic, topicId }); throw e; diff --git a/src/db/postgres/listener.js b/src/db/postgres/listener.js new file mode 100644 index 0000000..6ad387a --- /dev/null +++ b/src/db/postgres/listener.js @@ -0,0 +1,169 @@ +'use strict'; + +const common = require('../../common'); + +const _fileScope = common.fileScope(__filename); + + +const defaultOptions = { + channel: 'cache_invalidation', + dataCallback: common.nop, + connectionLostCallback: common.nop, + connectionEstablishedCallback: common.nop, + pingDelayMs: 5000, + reconnectDelayMs: 6000, + reconnectTimes: 10, +}; + +/** + * Create a robust connection which listens to a notification channel. + */ +class PostgresListener { + constructor(logger, db, options) { + this.logger = logger; + this.db = db; + + this.options = Object.assign({}, defaultOptions, options); + this.notificationEventName = 'notification'; + + this.connection = null; + this.nextPingTimeout = undefined; + + this._onConnectionLostBound = this._onConnectionLost.bind(this); + this._onNotificationBound = this._onNotification.bind(this); + } + + + /** + * Establish the listener connection. + */ + async start() { + await this._reconnect(0, 1); + this._sendPing(); + } + + + /** + * Shut down the listener connection. + */ + async stop() { + const _scope = _fileScope('stop'); + if (this.reconnectPending) { + this.logger.debug(_scope, 'overriding existing reconnect retry'); + clearTimeout(this.reconnectPending); + delete this.reconnectPending; + } + if (this.connection) { + this.connection.client.removeListener(this.notificationEventName, this.onNotificationBound); + this.connection.done(); + this.connection = null; + await this.options.connectionLostCallback(); + } + } + + + /** + * Begin sending connection pings. + */ + _sendPing() { + const _scope = _fileScope('_sendPing'); + this.nextPingTimeout = setTimeout(async () => { + try { + if (this.connection) { + await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' }); + } + } catch (e) { + this.logger.error(_scope, 'failed', { error: e }); + } finally { + this._sendPing(); + } + }, this.options.pingDelayMs); + } + + + /** + * Notify callback. + * @param {Object} data + */ + async _onNotification(data) { + const _scope = _fileScope('_onNotification'); + // Ignore our own messages + if (data.payload === 'ping') { + return; + } + this.logger.debug(_scope, 'called', data); + await this.options.dataCallback(data.payload); + } + + + /** + * Notify callback and attempt to reconnect. + * @param {*} error + * @param {*} event + */ + async _onConnectionLost(error, event) { + const _scope = _fileScope('_onConnectionLost'); + this.logger.error(_scope, 'listener connection lost', { error, event }); + this.connection = null; + try { + event.client.removeListener(this.notificationEventName, this.onNotificationBound); + } catch (e) { + this.logger.error(_scope, 'failed to remove listener', { error: e }); + // That's okay, it was probably just gone anyhow. + } + await this.options.connectionLostCallback(); + try { + await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes); + } catch (e) { + this.logger.error(_scope, 'failed to reconnect listener', { error: e }); + } + } + + + /** + * Schedule an attempt to establish a connection. + * @param {Number} delay + * @param {Number} retriesRemaining + */ + async _reconnect(delay, retriesRemaining) { + const _scope = _fileScope('_reconnect'); + if (this.connection) { + this.logger.debug(_scope, 'closing existing connection'); + this.connection.done(); + this.connection = null; + } + if (this.reconnectPending) { + this.logger.debug(_scope, 'overriding existing reconnect retry'); + clearTimeout(this.reconnectPending); + } + return new Promise((resolve, reject) => { + this.reconnectPending = setTimeout(async () => { + try { + delete this.reconnectPending; + this.connection = await this.db.connect({ + direct: true, + onLost: this._onConnectionLostBound, + }); + this.connection.client.on(this.notificationEventName, this._onNotificationBound); + await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel }); + this.logger.debug(_scope, 'listener connection established'); + await this.options.connectionEstablishedCallback(); + resolve(); + } catch (e) { + if (retriesRemaining <= 0) { + return reject(e); + } + try { + await this._reconnect(delay, retriesRemaining - 1); + resolve(); + } catch (e2) { + reject(e2); + } + } + }, delay); + }); + } + +} + +module.exports = PostgresListener; \ No newline at end of file diff --git a/src/db/sqlite/index.js b/src/db/sqlite/index.js index 775708d..a4c3d38 100644 --- a/src/db/sqlite/index.js +++ b/src/db/sqlite/index.js @@ -687,7 +687,7 @@ class DatabaseSQLite extends Database { let topics; try { topics = this.statement.topicGetInfoAll.all(); - } catch (e) { + } catch (e) { this.logger.error(_scope, 'failed', { error: e, topics }); throw e; } diff --git a/src/link-helper.js b/src/link-helper.js index 428f031..1f3cd09 100644 --- a/src/link-helper.js +++ b/src/link-helper.js @@ -117,8 +117,7 @@ class LinkHelper { .map(([name, value]) => ({ name, value })), }; links.push(link); - }); - + }); }); feedParser.on('readable', () => { let _item; @@ -147,8 +146,8 @@ class LinkHelper { const link = { target: attributes.href, attributes: Object.entries(attributes) - .filter(([name]) => name !== 'href') - .map(([name, value]) => ({ name, value })), + .filter(([name]) => name !== 'href') + .map(([name, value]) => ({ name, value })), }; links.push(link); } @@ -159,6 +158,7 @@ class LinkHelper { return links; } + /** * Attempt to resolve a relative target URI * @param {String} uri diff --git a/src/manager.js b/src/manager.js index b3aff45..6fb187c 100644 --- a/src/manager.js +++ b/src/manager.js @@ -40,7 +40,7 @@ class Manager { * @param {http.ServerResponse} res * @param {object} ctx */ - async getHealthcheck(res, ctx) { + async getHealthcheck(res, ctx) { const _scope = _fileScope('getHealthcheck'); const health = 'happy'; diff --git a/src/service.js b/src/service.js index dfba6b8..c69f6fb 100644 --- a/src/service.js +++ b/src/service.js @@ -66,7 +66,7 @@ class Service extends Dingus { * @param {Object} ctx * @param {String} newPath */ - async handlerRedirect(req, res, ctx, newPath) { + async handlerRedirect(req, res, ctx, newPath) { const _scope = _fileScope('handlerRedirect'); this.logger.debug(_scope, 'called', { req: common.requestLogData(req), ctx }); @@ -185,8 +185,9 @@ class Service extends Dingus { await this.manager.getTopicDetails(res, ctx); } + /** - * Same as super.ingestBody, but if no body was send, do not parse (and + * Same as super.ingestBody, but if no body was sent, do not parse (and * thus avoid possible unsupported media type error). * @param {http.ClientRequest} req * @param {http.ServerResponse} res @@ -206,7 +207,7 @@ class Service extends Dingus { * @param {http.ServerResponse} res * @param {Object} ctx */ - async handlerUpdateTopic(req, res, ctx) { + async handlerUpdateTopic(req, res, ctx) { const _scope = _fileScope('handlerUpdateTopic'); this.logger.debug(_scope, 'called', { req: common.requestLogData(req), ctx }); @@ -226,17 +227,17 @@ class Service extends Dingus { * @param {Object} ctx */ async handlerUpdateSubscription(req, res, ctx) { - const _scope = _fileScope('handlerUpdateSubscription'); - this.logger.debug(_scope, 'called', { req: common.requestLogData(req), ctx }); + const _scope = _fileScope('handlerUpdateSubscription'); + this.logger.debug(_scope, 'called', { req: common.requestLogData(req), ctx }); - this.setResponseType(this.responseTypes, req, res, ctx); + this.setResponseType(this.responseTypes, req, res, ctx); - await this.authenticator.required(req, res, ctx); + await this.authenticator.required(req, res, ctx); - await this.maybeIngestBody(req, res, ctx); - ctx.method = req.method; - await this.manager.updateSubscription(res, ctx); -} + await this.maybeIngestBody(req, res, ctx); + ctx.method = req.method; + await this.manager.updateSubscription(res, ctx); + } /** @@ -254,7 +255,7 @@ class Service extends Dingus { this.setResponseType(this.responseTypes, req, res, ctx); await this.serveFile(req, res, ctx, this.staticPath, file || ctx.params.file); - this.logger.info(_scope, 'finished', { ctx }); + this.logger.info(_scope, 'finished', { ctx: { ...ctx, responseBody: common.logTruncate((ctx.responseBody || '').toString(), 100) } }); } diff --git a/src/template/root-html.js b/src/template/root-html.js index 97a1ad9..d1939b8 100644 --- a/src/template/root-html.js +++ b/src/template/root-html.js @@ -83,7 +83,7 @@ function usageSection(isPublicHub, hubURL) { ` - : ` + : `

Private Hub

This hub only serves specific topics. diff --git a/src/template/template-helper.js b/src/template/template-helper.js index 57a3f50..b48a90a 100644 --- a/src/template/template-helper.js +++ b/src/template/template-helper.js @@ -30,7 +30,7 @@ const dateOrNot = (date, otherwise) => { * @param {Number} seconds * @returns {String} */ - const secondsToPeriod = (seconds) => { +const secondsToPeriod = (seconds) => { let value = seconds; const result = []; @@ -212,7 +212,7 @@ function htmlHeader(pageTitle, navLinks = []) {

    ${navLinks.map((l) => renderNavLink(l)).join('\n')}
` - : '') + ` + : '') + `
`; @@ -227,13 +227,15 @@ function htmlFooter() { return `
`; } diff --git a/src/worker.js b/src/worker.js index f956ba1..ca77369 100644 --- a/src/worker.js +++ b/src/worker.js @@ -95,7 +95,7 @@ class Worker { isSettled = true; rejected = rej; throw rej; - }); + }); Object.defineProperties(promise, { isSettled: { get: () => isSettled }, diff --git a/static/theme.css b/static/theme.css index 06f03a9..3e4c4f4 100644 --- a/static/theme.css +++ b/static/theme.css @@ -5,7 +5,7 @@ html { body { background-color: #fff; font-family: Helvetica, Verdana, sans-serif; - margin: 1em; + margin: 0 1em 0 1em; min-height: 100vh; display: flex; flex-direction: column; @@ -23,12 +23,13 @@ header nav ol li a { text-align: center; } h1 { - margin-top: 1.3em; - margin-bottom: 2.5em; + margin-top: 1em; + margin-bottom: 1.25em; text-align: center; } h2 { background-color: #ddd; + padding: .25em 0 .1em 0.25em; } main { flex-grow: 1; @@ -60,9 +61,8 @@ footer { width: 100%; border-top: 4px dotted #666; } -footer nav ol { +footer ol { list-style-type: none; - margin: 0; + margin: .5em; padding: 0; - border: 1px solid #000; } diff --git a/test/src/db/base.js b/test/src/db/base.js index 18871f4..7863657 100644 --- a/test/src/db/base.js +++ b/test/src/db/base.js @@ -108,7 +108,7 @@ describe('DatabaseBase', function () { }); }); // _ensureTypes - describe('schemaCheck', function () { + describe('initialize', function () { let currentSchema; beforeEach(function () { currentSchema = { @@ -123,7 +123,7 @@ describe('DatabaseBase', function () { sinon.stub(db, '_currentSchema').resolves(currentSchema); }); it('covers success', async function () { - await db.schemaCheck(); + await db.initialize(); }); it('covers failure', async function() { db.schemaVersionsSupported = { @@ -139,13 +139,13 @@ describe('DatabaseBase', function () { }, }; try { - await db.schemaCheck(); + await db.initialize(); assert.fail('did not get expected exception'); } catch (e) { assert(e instanceof DBErrors.MigrationNeeded); } }); - }); // schemaCheck + }); // initialize describe('_topicDefaults', function () { let topic; diff --git a/test/src/db/integration.js b/test/src/db/integration.js index 3cb07f9..e6f632c 100644 --- a/test/src/db/integration.js +++ b/test/src/db/integration.js @@ -69,7 +69,7 @@ describe('Database Integration', function () { // eslint-disable-next-line security/detect-non-literal-require DB = require(i.module); db = new DB(stubLogger, i.config); - await db.schemaCheck(); + await db.initialize(); await db._purgeTables(true); }); after(async function () { diff --git a/test/src/db/postgres-listener.js b/test/src/db/postgres-listener.js new file mode 100644 index 0000000..7926746 --- /dev/null +++ b/test/src/db/postgres-listener.js @@ -0,0 +1,197 @@ +/* eslint-env mocha */ +'use strict'; + +const assert = require('assert'); +const sinon = require('sinon'); +const stubLogger = require('../../stub-logger'); +const Listener = require('../../../src/db/postgres/listener'); + +const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms)); +const noExpectedException = 'did not get expected exception'; + +describe('Postgres Listener', function () { + let listener, options, connectionStub, pgpStub; + beforeEach(function () { + connectionStub = { + client: { + on: sinon.stub(), + removeListener: sinon.stub(), + }, + done: sinon.stub(), + none: sinon.stub(), + }; + pgpStub = { + connect: sinon.stub().resolves(connectionStub), + }; + options = { + dataCallback: sinon.stub(), + connectionLostCallback: sinon.stub(), + connectionEstablishedCallback: sinon.stub(), + pingDelayMs: 100, + reconnectDelayMs: 1000, + reconnectTimes: 1, + }; + listener = new Listener(stubLogger, pgpStub, options); + }); + afterEach(function () { + sinon.restore(); + }); + + describe('start', function () { + it('covers', async function () { + sinon.stub(listener, '_reconnect').resolves(); + sinon.stub(listener, '_sendPing').resolves(); + await listener.start(); + assert(listener._reconnect.called); + assert(listener._sendPing.called); + }); + }); // start + + describe('stop', function () { + it('covers not started', async function () { + await listener.stop(); + }); + it('cancels pending reconnect', async function() { + const pendingReconnect = sinon.stub(); + listener.reconnectPending = setTimeout(pendingReconnect, 100); + await listener.stop(); + snooze(110); + assert(!pendingReconnect.called); + }); + it('closes existing connection', async function () { + listener.connection = connectionStub; + await listener.stop(); + assert(connectionStub.client.removeListener.called); + assert.strictEqual(listener.connection, null); + assert(options.connectionLostCallback.called); + }); + }); // stop + + describe('_reconnect', function () { + it('reconnects', async function () { + await listener._reconnect(0, 1); + assert(listener.connection); + assert(options.connectionEstablishedCallback.called); + }); + it('closes existing connection before reconnecting', async function () { + const existingConnection = { + done: sinon.stub(), + }; + listener.connection = existingConnection; + await listener._reconnect(0, 1); + assert(existingConnection.done.called); + }); + it('overrides a pending reconnect', async function () { + this.slow(300); + const pendingReconnect = sinon.stub(); + listener.reconnectPending = setTimeout(pendingReconnect, 100); + await listener._reconnect(0, 1); + await snooze(110); + assert(!pendingReconnect.called); + }); + it('fails with no remaining retries', async function () { + const expected = new Error('foo'); + pgpStub.connect = sinon.stub().rejects(expected); + try { + await listener._reconnect(0, 0); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + it('fails all remaining retries', async function () { + const expected = new Error('foo'); + pgpStub.connect = sinon.stub().rejects(expected); + try { + await listener._reconnect(0, 1); + assert.fail(noExpectedException); + } catch (e) { + assert.deepStrictEqual(e, expected); + } + }); + it('fails first retry', async function () { + const expected = new Error('foo'); + pgpStub.connect = sinon.stub().onCall(0).rejects(expected).resolves(connectionStub); + await listener._reconnect(0, 1); + assert(options.connectionEstablishedCallback.called); + }); + }); // _reconnect + + describe('_onConnectionLost', function () { + let error, event; + beforeEach(function () { + error = new Error('blah'); + event = connectionStub; + sinon.stub(listener, '_reconnect'); + }); + it('success', async function () { + await listener._onConnectionLost(error, event); + assert.strictEqual(listener.connection, null); + assert(event.client.removeListener.called); + assert(listener.options.connectionLostCallback.called); + assert(listener._reconnect.called); + }); + it('covers reconnect failure', async function () { + listener._reconnect.rejects(error); + await listener._onConnectionLost(error, event); + assert.strictEqual(listener.connection, null); + assert(event.client.removeListener.called); + assert(listener.options.connectionLostCallback.called); + assert(listener._reconnect.called); + }); + it('covers listener removal failure', async function () { + event.client.removeListener.throws(error); + await listener._onConnectionLost(error, event); + assert.strictEqual(listener.connection, null); + assert(event.client.removeListener.called); + assert(listener.options.connectionLostCallback.called); + assert(listener._reconnect.called); + }); + }); // _onConnectionLost + + describe('_onNotification', function () { + it('sends data', async function () { + const data = { + payload: 'foo', + }; + await listener._onNotification(data); + assert(listener.options.dataCallback.called); + }); + it('ignores pings', async function () { + const data = { + payload: 'ping', + }; + await listener._onNotification(data); + assert(!listener.options.dataCallback.called); + }); + }); // _onNotification + + describe('_sendPing', function () { + it('covers no connection', async function () { + this.slow(300); + await listener._sendPing(); + await snooze(110); + clearTimeout(listener.nextPingTimeout); + }); + it('success', async function () { + this.slow(300); + listener.connection = connectionStub; + await listener._sendPing(); + await snooze(110); + clearTimeout(listener.nextPingTimeout); + assert(connectionStub.none.called); + }); + it('covers error', async function () { + const err = new Error('blah'); + this.slow(300); + listener.connection = connectionStub; + listener.connection.none.rejects(err); + await listener._sendPing(); + await snooze(110); + clearTimeout(listener.nextPingTimeout); + assert(listener.connection.none.called); + + }); + }); // _sendPing + +}); // Postgres Listener diff --git a/test/src/db/postgres.js b/test/src/db/postgres.js index 5df49fd..ef47905 100644 --- a/test/src/db/postgres.js +++ b/test/src/db/postgres.js @@ -70,6 +70,13 @@ describe('DatabasePostgres', function () { sinon.restore(); }); + it('covers listener', function () { + const listenerOptions = new Config('test'); + listenerOptions.db.cacheEnabled = true; + const listenerDb = new DB(stubLogger, listenerOptions, pgpStub); + assert(listenerDb); + }); + // Ensure all interface methods are implemented describe('Implementation', function () { it('implements interface', async function () { @@ -104,6 +111,11 @@ describe('DatabasePostgres', function () { db.pgpInitOptions.query(event); assert(db.logger.debug.called); }); + it('covers NOTIFY', function () { + const event = { query: 'NOTIFY thing' }; + db.pgpInitOptions.query(event); + assert(!db.logger.debug.called); + }); }); // query describe('receive', function () { it('covers', function () { @@ -133,6 +145,35 @@ describe('DatabasePostgres', function () { assert(db.logger.debug.called); assert.deepStrictEqual(data, expectedData); }); + it('covers NOTIFY', function () { + const data = [ + { + column_one: 'one', // eslint-disable-line camelcase + column_two: 2, // eslint-disable-line camelcase + }, + { + column_one: 'foo', // eslint-disable-line camelcase + column_two: 4, // eslint-disable-line camelcase + }, + ]; + const result = { + command: 'NOTIFY', + }; + const event = {}; + const expectedData = [ + { + columnOne: 'one', + columnTwo: 2, + }, + { + columnOne: 'foo', + columnTwo: 4, + }, + ]; + db.pgpInitOptions.receive(data, result, event) + assert(!db.logger.debug.called); + assert.deepStrictEqual(data, expectedData); + }); }); // receive }); // pgpInitOptions @@ -155,17 +196,20 @@ describe('DatabasePostgres', function () { }); }); // _initTables - describe('schemaCheck', function () { + describe('initialize', function () { + after(function () { + delete db.listener; + }); it('passes supported version', async function () { const version = { major: 1, minor: 0, patch: 0 }; sinon.stub(db.db, 'one').resolves(version); - await db.schemaCheck(false); + await db.initialize(false); }); it('fails low version', async function () { const version = { major: 0, minor: 0, patch: 0 }; sinon.stub(db.db, 'one').resolves(version); try { - await db.schemaCheck(false); + await db.initialize(false); assert.fail(noExpectedException); } catch (e) { assert(e instanceof DBErrors.MigrationNeeded); @@ -175,7 +219,7 @@ describe('DatabasePostgres', function () { const version = { major: 100, minor: 100, patch: 100 }; sinon.stub(db.db, 'one').resolves(version); try { - await db.schemaCheck(false); + await db.initialize(false); assert.fail(noExpectedException); } catch (e) { assert(e instanceof DBErrors.MigrationNeeded); @@ -186,9 +230,18 @@ describe('DatabasePostgres', function () { sinon.stub(db.db, 'multiResult'); sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.max); sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max); - await db.schemaCheck(); + await db.initialize(); }); - }); // schemaCheck + it('covers listener', async function() { + db.listener = { + start: sinon.stub(), + }; + const version = { major: 1, minor: 0, patch: 0 }; + sinon.stub(db.db, 'one').resolves(version); + await db.initialize(false); + assert(db.listener.start.called); + }); + }); // initialize describe('healthCheck', function () { beforeEach(function () { @@ -228,6 +281,9 @@ describe('DatabasePostgres', function () { }); // _queryFileHelper describe('_closeConnection', function () { + after(function () { + delete db.listener; + }); it('success', async function () { sinon.stub(db._pgp, 'end'); await db._closeConnection(); @@ -243,6 +299,14 @@ describe('DatabasePostgres', function () { assert.deepStrictEqual(e, expected); } }); + it('covers listener', async function () { + db.listener = { + stop: sinon.stub(), + }; + sinon.stub(db._pgp, 'end'); + await db._closeConnection(); + assert(db._pgp.end.called); + }); }); // _closeConnection describe('_purgeTables', function () { @@ -268,6 +332,84 @@ describe('DatabasePostgres', function () { }); }); // _purgeTables + describe('_topicChanged', function () { + beforeEach(function () { + db.cache = new Map(); + sinon.stub(db.cache, 'delete'); + }); + after(function () { + delete db.cache; + }); + it('covers', function () { + db._topicChanged('topic-id'); + assert(db.cache.delete.called); + }); + it('ignores ping', function () { + db._topicChanged('ping'); + assert(!db.cache.delete.called); + }); + }); // _topicChanged + + describe('_listenerEstablished', function () { + it('creates cache', function () { + delete db.cache; + db._listenerEstablished(); + assert(db.cache instanceof Map); + }); + }); // _listenerEstablished + + describe('_listenerLost', function () { + it('removes cache', function () { + db.cache = new Map(); + db._listenerLost(); + assert(!db.cache); + }); + }); // _listenerLost + + describe('_cacheGet', function () { + let key; + beforeEach(function () { + key = 'key'; + }); + it('nothing if no cache', function () { + delete db.cache; + const result = db._cacheGet(key); + assert.strictEqual(result, undefined); + }); + it('nothing if no entry', function () { + db.cache = new Map(); + const result = db._cacheGet(key); + assert.strictEqual(result, undefined); + }); + it('returns cached entry', function () { + db.cache = new Map(); + const expected = { + foo: 'bar', + }; + db._cacheSet(key, expected); + const result = db._cacheGet(key); + assert.deepStrictEqual(result, expected); + }); + }); // _cacheGet + + describe('_cacheSet', function () { + let key; + beforeEach(function () { + key = 'key'; + }); + it('covers no cache', function () { + delete db.cache; + db._cacheSet(key, 'data'); + }); + it('covers cache', function () { + db.cache = new Map(); + const expected = 'blah'; + db._cacheSet(key, expected); + const result = db._cacheGet(key); + assert.deepStrictEqual(result, expected); + }); + }); // _cacheSet + describe('context', function () { it('covers', async function () { await db.context(common.nop); @@ -1024,8 +1166,15 @@ describe('DatabasePostgres', function () { }); // topicGetByUrl describe('topicGetContentById', function () { + let topic; + beforeEach(function () { + delete db.cache; + topic = { + id: topicId, + }; + }); it('success', async function() { - const expected = { id: topicId }; + const expected = topic; sinon.stub(db.db, 'oneOrNone').resolves(expected); const result = await db.topicGetContentById(dbCtx, topicId); assert.deepStrictEqual(result, expected); @@ -1046,6 +1195,23 @@ describe('DatabasePostgres', function () { assert.deepStrictEqual(e, expected); } }); + it('caches success', async function () { + db.cache = new Map(); + const expected = topic; + sinon.stub(db.db, 'oneOrNone').resolves(expected); + const result = await db.topicGetContentById(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + }); + it('covers cached entry', async function() { + let result; + db.cache = new Map(); + const expected = topic; + sinon.stub(db.db, 'oneOrNone').resolves(expected); + result = await db.topicGetContentById(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + result = await db.topicGetContentById(dbCtx, topicId); + assert.deepStrictEqual(result, expected); + }); }); // topicGetContentById describe('topicSet', function () { @@ -1185,7 +1351,7 @@ describe('DatabasePostgres', function () { } }); - }); + }); // topicUpdate describe('verificationClaim', function () { it('success', async function() { diff --git a/test/stub-db.js b/test/stub-db.js index 608ea77..5ef2422 100644 --- a/test/stub-db.js +++ b/test/stub-db.js @@ -13,7 +13,7 @@ const stubFns = [ 'authenticationGet', 'authenticationUpsert', 'healthCheck', - 'schemaCheck', + 'initialize', 'subscriptionsByTopicId', 'subscriptionCountByTopicUrl', 'subscriptionDelete',