"error",
"last"
],
+ "indent": [
+ "warn",
+ 2,
+ {
+ "SwitchCase": 1
+ }
+ ],
"sonarjs/cognitive-complexity": "warn",
"sonarjs/no-duplicate-string": "warn",
"keyword-spacing": "error",
--- /dev/null
+{
+ "MD013": false,
+ "MD024": false
+}
## [Unreleased]
+## [v1.1.0] - 2021-08-08
+
+### Added
+
+- Caching of topic contents for Postfix database backends. This should greatly reduce the db load when many subscribers to a topic are delivered an update.
+- Minor cleanup to generated HTML pages.
+
## [v1.0.0] - 2021-08-01
### Added
-Everything. MVP first stable release.
+- Everything. MVP first stable release.
---
-[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.0.0
+[Unreleased]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=HEAD;hp=v1.1.0
+[v1.1.0]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.1.0;hp=v1.0.0
[v1.0.0]: https://git.squeep.com/?p=websub-hub;a=commitdiff;h=v1.0.0;hp=v0.0.0
\ No newline at end of file
}
(async () => {
+ await db.initialize();
const password = await readPassword('password: ');
const credential = await argon2.hash(password, { type: argon2.argon2id });
console.log(`\t${identifier}:${credential}`);
db: {
connectionString: '', // e.g. sqlite://path/to/dbfile.sqlite
queryLogLevel: undefined, // Set to log queries
+ cacheEnabled: true, // Cache some db responses. (Postgres only)
+ listener: { // Settings for the cache-invalidator connection. (Postgres only)
+ // pingDelayMs: 5000, // Connection keep-alive/health-check.
+ // reconnectDelayMs: 6000, // Wait time before attempting reconnection.
+ // reconnectTimes: 10, // Retries limit.
+ },
},
// Logging options
},
db: {
queryLogLevel: 'debug',
+ cacheEnabled: false,
},
};
"dev": true
},
"@squeep/api-dingus": {
- "version": "git+https://git.squeep.com/squeep-api-dingus/#16db6709ab8407b1f696e3d5f92aa6980f182f39",
- "from": "git+https://git.squeep.com/squeep-api-dingus/#v1.0.0",
+ "version": "git+https://git.squeep.com/squeep-api-dingus/#12b96f53e7976b74296c1e024432b88749e6c4b0",
+ "from": "git+https://git.squeep.com/squeep-api-dingus/#v1.1-dev",
"requires": {
"mime-db": "^1.49.0",
"uuid": "^8.3.2"
},
"dependencies": {
- "mime-db": {
- "version": "1.49.0",
- "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.49.0.tgz",
- "integrity": "sha512-CIc8j9URtOVApSFCQIF+VBkX1RwXp/oMMOrqdyXSBXq5RWNEsRfyj1kiRnQgmNXmHxPoFIxOroKA3zcU9P+nAA=="
- },
"uuid": {
"version": "8.3.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz",
}
}
},
+ "mime-db": {
+ "version": "1.49.0",
+ "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.49.0.tgz",
+ "integrity": "sha512-CIc8j9URtOVApSFCQIF+VBkX1RwXp/oMMOrqdyXSBXq5RWNEsRfyj1kiRnQgmNXmHxPoFIxOroKA3zcU9P+nAA=="
+ },
"mimic-response": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/mimic-response/-/mimic-response-2.1.0.tgz",
}
},
"tar": {
- "version": "6.1.0",
- "resolved": "https://registry.npmjs.org/tar/-/tar-6.1.0.tgz",
- "integrity": "sha512-DUCttfhsnLCjwoDoFcI+B2iJgYa93vBnDUATYEeRx6sntCTdN01VnqsIuTlALXla/LWooNg0yEGeB+Y8WdFxGA==",
+ "version": "6.1.6",
+ "resolved": "https://registry.npmjs.org/tar/-/tar-6.1.6.tgz",
+ "integrity": "sha512-oaWyu5dQbHaYcyZCTfyPpC+VmI62/OM2RTUYavTk1MDr1cwW5Boi3baeYQKiZbY2uSQJGr+iMOzb/JFxLrft+g==",
"requires": {
"chownr": "^2.0.0",
"fs-minipass": "^2.0.0",
{
"name": "websub-hub",
- "version": "1.0.0",
+ "version": "1.1.0",
"description": "A WebSub Hub server implementation.",
"main": "server.js",
"scripts": {
"coverage-check"
],
"dependencies": {
- "@squeep/api-dingus": "git+https://git.squeep.com/squeep-api-dingus/#v1.0.0",
+ "@squeep/api-dingus": "git+https://git.squeep.com/squeep-api-dingus/#v1.1.0",
"@squeep/web-linking": "git+https://git.squeep.com/squeep-web-linking/#v1.0.0",
"argon2": "^0.28.2",
"axios": "^0.21.1",
config = new Config(process.env.NODE_ENV);
logger = new Logger(config);
db = new DB(logger, config);
- await db.schemaCheck();
+ await db.initialize();
service = new Service(logger, db, config);
http.createServer((req, res) => {
const authData = req.getHeader(Enum.Header.Authorization);
if (authData
&& await this.isValidAuthorization(authData, ctx)) {
- return true;
+ return true;
}
return this.requestBasic(res);
}
* @param {Number} jitter
* @returns {Number}
*/
- const attemptRetrySeconds = (attempt, retryBackoffSeconds = [60, 120, 360, 1440, 7200, 43200, 86400], jitter = 0.618) => {
+const attemptRetrySeconds = (attempt, retryBackoffSeconds = [60, 120, 360, 1440, 7200, 43200, 86400], jitter = 0.618) => {
const maxAttempt = retryBackoffSeconds.length - 1;
if (typeof attempt !== 'number' || attempt < 0) {
attempt = 0;
* @param {Array} array
* @param {Number} per
*/
- const arrayChunk = (array, per = 1) => {
+const arrayChunk = (array, per = 1) => {
const nChunks = Math.ceil(array.length / per);
return Array.from(Array(nChunks), (_, i) => array.slice(i * per, (i + 1) * per));
}
* @param {Array} dst
* @param {Array} src
*/
- const stackSafePush = (dst, src) => {
+const stackSafePush = (dst, src) => {
const jsEngineMaxArguments = 2**16; // Current as of Node 12
arrayChunk(src, jsEngineMaxArguments).forEach((items) => {
Array.prototype.push.apply(dst, items);
* @param {String} method
* @param {arguments} args
*/
- _notImplemented(method, args) {
+ _notImplemented(method, args) {
this.logger.error(_fileScope(method), 'abstract method called', Array.from(args));
throw new DBErrors.NotImplemented(method);
}
/**
- * Validate schema compatibility.
- * Ensure this is called immediately after instantiating a DB instance,
- * as some engines also finish initialization and validation here, which
- * was easier than wrangling async calls in constructor.
- * In light of this behavior, this method could be named better.
- */
- async schemaCheck() {
- const _scope = _fileScope('schemaCheck');
+ * Perform tasks needed to prepare database for use. Ensure this is called
+ * after construction, and before any other database activity.
+ * At the minimum, this will validate a compatible schema is present and usable.
+ * Some engines will also perform other initializations or async actions which
+ * are easier handled outside the constructor.
+ */
+ async initialize() {
+ const _scope = _fileScope('initialize');
const currentSchema = await this._currentSchema();
const current = svh.schemaVersionObjectToNumber(currentSchema);
* @param {String} callback
* @param {*} topicId
*/
- async subscriptionGet(dbCtx, callback, topicId) {
+ async subscriptionGet(dbCtx, callback, topicId) {
this._notImplemented('subscriptionGet', arguments);
}
* @param {String=} data.httpRemoteAddr
* @param {String=} data.httpFrom
*/
- async subscriptionUpsert(dbCtx, data) {
+ async subscriptionUpsert(dbCtx, data) {
this._notImplemented('subscriptionUpsert', arguments);
}
* @param {*} topicId
* @returns {Boolean}
*/
- async topicFetchRequested(dbCtx, topicId) {
+ async topicFetchRequested(dbCtx, topicId) {
this._notImplemented('topicPublish', arguments);
}
* @param {Boolean} claim
* @returns {*} verificationId
*/
- async verificationInsert(dbCtx, verification) {
+ async verificationInsert(dbCtx, verification) {
this._notImplemented('verificationInsert', arguments);
}
* @param {String} data.reason
* @param {Boolean} data.isPublisherValidated
*/
- async verificationUpdate(dbCtx, verificationId, data) {
+ async verificationUpdate(dbCtx, verificationId, data) {
this._notImplemented('verificationUpdate', arguments);
- }
+ }
/**
const svh = require('../schema-version-helper');
const Database = require('../base');
const DBErrors = require('../errors');
+const Listener = require('./listener');
const common = require('../../common');
const _fileScope = common.fileScope(__filename);
// Suppress QF warnings when running tests
this.noWarnings = options.db.noWarnings;
+ if (options.db.cacheEnabled) {
+ this.listener = new Listener(logger, this.db, Object.assign({}, options.db.listener, {
+ channel: 'topic_changed',
+ dataCallback: this._topicChanged.bind(this),
+ connectionEstablishedCallback: this._listenerEstablished.bind(this),
+ connectionLostCallback: this._listenerLost.bind(this),
+ }));
+ }
+
// Log queries
const queryLogLevel = options.db.queryLogLevel;
if (queryLogLevel) {
pgpInitOptions.query = (event) => {
+ // Quell outgoing pings
+ if (event && event.query && event.query.startsWith('NOTIFY')) {
+ return;
+ }
this.logger[queryLogLevel](_fileScope('pgp:query'), '', { ...common.pick(event, ['query', 'params']) });
};
}
}
}
if (queryLogLevel) {
+ // Quell outgoing pings
+ if (result && result.command === 'NOTIFY') {
+ return;
+ }
// Omitting .rows
const resultLog = common.pick(result, ['command', 'rowCount', 'duration']);
this.logger[queryLogLevel](_fileScope('pgp:result'), '', { query: event.query, ...resultLog });
_queryFileHelper(_pgp) {
return (file) => {
const _scope = _fileScope('_queryFile');
+ /* istanbul ignore next */
const qfParams = {
minify: true,
...(this.noWarnings && { noWarnings: this.noWarnings }),
}
- async schemaCheck(applyMigrations = true) {
- const _scope = _fileScope('schemaCheck');
+ async initialize(applyMigrations = true) {
+ const _scope = _fileScope('initialize');
this.logger.debug(_scope, 'called', { applyMigrations });
if (applyMigrations) {
await this._initTables();
}
- await super.schemaCheck();
+ await super.initialize();
+ if (this.listener) {
+ await this.listener.start();
+ }
}
async _closeConnection() {
const _scope = _fileScope('_closeConnection');
try {
+ if (this.listener) {
+ await this.listener.stop();
+ }
await this._pgp.end();
} catch (e) {
this.logger.error(_scope, 'failed', { error: e });
}
+ /* istanbul ignore next */
async _purgeTables(really = false) {
const _scope = _fileScope('_purgeTables');
try {
}
+ /**
+ * Receive notices when topic entry is updated.
+ * Clear relevant cache entry.
+ * @param {String} payload
+ */
+ _topicChanged(payload) {
+ const _scope = _fileScope('_topicChanged');
+ if (payload !== 'ping') {
+ this.logger.debug(_scope, 'called', { payload });
+ this.cache.delete(payload);
+ }
+ }
+
+
+ /**
+ * Called when a listener connection is opened.
+ * Enable cache.
+ */
+ _listenerEstablished() {
+ const _scope = _fileScope('_listenerEstablished');
+ this.logger.debug(_scope, 'called', {});
+ this.cache = new Map();
+ }
+
+
+ /**
+ * Called when a listener connection is closed.
+ * Disable cache.
+ */
+ _listenerLost() {
+ const _scope = _fileScope('_listenerLost');
+ this.logger.debug(_scope, 'called', {});
+ delete this.cache;
+ }
+
+
+ /**
+ * Return a cached entry, if available.
+ * @param {*} key
+ */
+ _cacheGet(key) {
+ const _scope = _fileScope('_cacheGet');
+ if (this.cache && this.cache.has(key)) {
+ const cacheEntry = this.cache.get(key);
+ this.logger.debug(_scope, 'found cache entry', { key, ...common.pick(cacheEntry, ['added', 'hits', 'lastHit']) });
+ cacheEntry.hits += 1;
+ cacheEntry.lastHit = new Date();
+ return cacheEntry.data;
+ }
+ }
+
+
+ /**
+ * Store an entry in cache, if available.
+ * @param {*} key
+ * @param {*} data
+ */
+ _cacheSet(key, data) {
+ const _scope = _fileScope('_cacheSet');
+ if (this.cache) {
+ this.cache.set(key, {
+ added: new Date(),
+ hits: 0,
+ lastHit: undefined,
+ data,
+ });
+ this.logger.debug(_scope, 'added cache entry', { key });
+ }
+ }
+
+
async context(fn) {
return this.db.task(async (t) => fn(t));
}
let topics;
try {
topics = await dbCtx.manyOrNone(this.statement.topicGetInfoAll);
- } catch (e) {
+ } catch (e) {
this.logger.error(_scope, 'failed', { error: e, topics });
throw e;
}
let topic;
try {
+ topic = this._cacheGet(topicId);
+ if (topic) {
+ return topic;
+ }
topic = await dbCtx.oneOrNone(this.statement.topicGetContentById, { topicId });
- return this._topicDefaults(topic);
+ const topicWithDefaults = this._topicDefaults(topic);
+ this._cacheSet(topicId, topicWithDefaults);
+ return topicWithDefaults;
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, topic, topicId });
throw e;
--- /dev/null
+'use strict';
+
+const common = require('../../common');
+
+const _fileScope = common.fileScope(__filename);
+
+
+const defaultOptions = {
+ channel: 'cache_invalidation',
+ dataCallback: common.nop,
+ connectionLostCallback: common.nop,
+ connectionEstablishedCallback: common.nop,
+ pingDelayMs: 5000,
+ reconnectDelayMs: 6000,
+ reconnectTimes: 10,
+};
+
+/**
+ * Create a robust connection which listens to a notification channel.
+ */
+class PostgresListener {
+ constructor(logger, db, options) {
+ this.logger = logger;
+ this.db = db;
+
+ this.options = Object.assign({}, defaultOptions, options);
+ this.notificationEventName = 'notification';
+
+ this.connection = null;
+ this.nextPingTimeout = undefined;
+
+ this._onConnectionLostBound = this._onConnectionLost.bind(this);
+ this._onNotificationBound = this._onNotification.bind(this);
+ }
+
+
+ /**
+ * Establish the listener connection.
+ */
+ async start() {
+ await this._reconnect(0, 1);
+ this._sendPing();
+ }
+
+
+ /**
+ * Shut down the listener connection.
+ */
+ async stop() {
+ const _scope = _fileScope('stop');
+ if (this.reconnectPending) {
+ this.logger.debug(_scope, 'overriding existing reconnect retry');
+ clearTimeout(this.reconnectPending);
+ delete this.reconnectPending;
+ }
+ if (this.connection) {
+ this.connection.client.removeListener(this.notificationEventName, this.onNotificationBound);
+ this.connection.done();
+ this.connection = null;
+ await this.options.connectionLostCallback();
+ }
+ }
+
+
+ /**
+ * Begin sending connection pings.
+ */
+ _sendPing() {
+ const _scope = _fileScope('_sendPing');
+ this.nextPingTimeout = setTimeout(async () => {
+ try {
+ if (this.connection) {
+ await this.connection.none('NOTIFY $(channel:name), $(payload)', { channel: this.options.channel, payload: 'ping' });
+ }
+ } catch (e) {
+ this.logger.error(_scope, 'failed', { error: e });
+ } finally {
+ this._sendPing();
+ }
+ }, this.options.pingDelayMs);
+ }
+
+
+ /**
+ * Notify callback.
+ * @param {Object} data
+ */
+ async _onNotification(data) {
+ const _scope = _fileScope('_onNotification');
+ // Ignore our own messages
+ if (data.payload === 'ping') {
+ return;
+ }
+ this.logger.debug(_scope, 'called', data);
+ await this.options.dataCallback(data.payload);
+ }
+
+
+ /**
+ * Notify callback and attempt to reconnect.
+ * @param {*} error
+ * @param {*} event
+ */
+ async _onConnectionLost(error, event) {
+ const _scope = _fileScope('_onConnectionLost');
+ this.logger.error(_scope, 'listener connection lost', { error, event });
+ this.connection = null;
+ try {
+ event.client.removeListener(this.notificationEventName, this.onNotificationBound);
+ } catch (e) {
+ this.logger.error(_scope, 'failed to remove listener', { error: e });
+ // That's okay, it was probably just gone anyhow.
+ }
+ await this.options.connectionLostCallback();
+ try {
+ await this._reconnect(this.options.reconnectDelayMs, this.options.reconnectTimes);
+ } catch (e) {
+ this.logger.error(_scope, 'failed to reconnect listener', { error: e });
+ }
+ }
+
+
+ /**
+ * Schedule an attempt to establish a connection.
+ * @param {Number} delay
+ * @param {Number} retriesRemaining
+ */
+ async _reconnect(delay, retriesRemaining) {
+ const _scope = _fileScope('_reconnect');
+ if (this.connection) {
+ this.logger.debug(_scope, 'closing existing connection');
+ this.connection.done();
+ this.connection = null;
+ }
+ if (this.reconnectPending) {
+ this.logger.debug(_scope, 'overriding existing reconnect retry');
+ clearTimeout(this.reconnectPending);
+ }
+ return new Promise((resolve, reject) => {
+ this.reconnectPending = setTimeout(async () => {
+ try {
+ delete this.reconnectPending;
+ this.connection = await this.db.connect({
+ direct: true,
+ onLost: this._onConnectionLostBound,
+ });
+ this.connection.client.on(this.notificationEventName, this._onNotificationBound);
+ await this.connection.none('LISTEN $(channel:name)', { channel: this.options.channel });
+ this.logger.debug(_scope, 'listener connection established');
+ await this.options.connectionEstablishedCallback();
+ resolve();
+ } catch (e) {
+ if (retriesRemaining <= 0) {
+ return reject(e);
+ }
+ try {
+ await this._reconnect(delay, retriesRemaining - 1);
+ resolve();
+ } catch (e2) {
+ reject(e2);
+ }
+ }
+ }, delay);
+ });
+ }
+
+}
+
+module.exports = PostgresListener;
\ No newline at end of file
let topics;
try {
topics = this.statement.topicGetInfoAll.all();
- } catch (e) {
+ } catch (e) {
this.logger.error(_scope, 'failed', { error: e, topics });
throw e;
}
.map(([name, value]) => ({ name, value })),
};
links.push(link);
- });
-
+ });
});
feedParser.on('readable', () => {
let _item;
const link = {
target: attributes.href,
attributes: Object.entries(attributes)
- .filter(([name]) => name !== 'href')
- .map(([name, value]) => ({ name, value })),
+ .filter(([name]) => name !== 'href')
+ .map(([name, value]) => ({ name, value })),
};
links.push(link);
}
return links;
}
+
/**
* Attempt to resolve a relative target URI
* @param {String} uri
* @param {http.ServerResponse} res
* @param {object} ctx
*/
- async getHealthcheck(res, ctx) {
+ async getHealthcheck(res, ctx) {
const _scope = _fileScope('getHealthcheck');
const health = 'happy';
* @param {Object} ctx
* @param {String} newPath
*/
- async handlerRedirect(req, res, ctx, newPath) {
+ async handlerRedirect(req, res, ctx, newPath) {
const _scope = _fileScope('handlerRedirect');
this.logger.debug(_scope, 'called', { req: common.requestLogData(req), ctx });
await this.manager.getTopicDetails(res, ctx);
}
+
/**
- * Same as super.ingestBody, but if no body was send, do not parse (and
+ * Same as super.ingestBody, but if no body was sent, do not parse (and
* thus avoid possible unsupported media type error).
* @param {http.ClientRequest} req
* @param {http.ServerResponse} res
* @param {http.ServerResponse} res
* @param {Object} ctx
*/
- async handlerUpdateTopic(req, res, ctx) {
+ async handlerUpdateTopic(req, res, ctx) {
const _scope = _fileScope('handlerUpdateTopic');
this.logger.debug(_scope, 'called', { req: common.requestLogData(req), ctx });
* @param {Object} ctx
*/
async handlerUpdateSubscription(req, res, ctx) {
- const _scope = _fileScope('handlerUpdateSubscription');
- this.logger.debug(_scope, 'called', { req: common.requestLogData(req), ctx });
+ const _scope = _fileScope('handlerUpdateSubscription');
+ this.logger.debug(_scope, 'called', { req: common.requestLogData(req), ctx });
- this.setResponseType(this.responseTypes, req, res, ctx);
+ this.setResponseType(this.responseTypes, req, res, ctx);
- await this.authenticator.required(req, res, ctx);
+ await this.authenticator.required(req, res, ctx);
- await this.maybeIngestBody(req, res, ctx);
- ctx.method = req.method;
- await this.manager.updateSubscription(res, ctx);
-}
+ await this.maybeIngestBody(req, res, ctx);
+ ctx.method = req.method;
+ await this.manager.updateSubscription(res, ctx);
+ }
/**
this.setResponseType(this.responseTypes, req, res, ctx);
await this.serveFile(req, res, ctx, this.staticPath, file || ctx.params.file);
- this.logger.info(_scope, 'finished', { ctx });
+ this.logger.info(_scope, 'finished', { ctx: { ...ctx, responseBody: common.logTruncate((ctx.responseBody || '').toString(), 100) } });
}
</li>
</ul>
</div>`
- : `
+ : `
<h2>Private Hub</h2>
<p>
This hub only serves specific topics.
* @param {Number} seconds
* @returns {String}
*/
- const secondsToPeriod = (seconds) => {
+const secondsToPeriod = (seconds) => {
let value = seconds;
const result = [];
<ol>
${navLinks.map((l) => renderNavLink(l)).join('\n')}
</ol>`
- : '') + `
+ : '') + `
</nav>
</header>
<main>`;
return ` </main>
<footer>
<ol>
- <a href="https://git.squeep.com/?p=websub-hub;a=tree">Development Repository</a>
- </ol>
- <ol>
- <a href="https://squeep.com/">A Squeep Infrastructure Component</a>
- </ol>
- <ol>
- ©<time datetime="2021">ⅯⅯⅩⅩⅠ</time>
+ <li>
+ <a href="https://git.squeep.com/?p=websub-hub;a=tree">Development Repository</a>
+ </li>
+ <li>
+ <a href="https://squeep.com/">A Squeep Infrastructure Component</a>
+ </li>
+ <li>
+ ©<time datetime="2021">ⅯⅯⅩⅩⅠ</time>
+ </li>
</ol>
</footer>`;
}
isSettled = true;
rejected = rej;
throw rej;
- });
+ });
Object.defineProperties(promise, {
isSettled: { get: () => isSettled },
body {
background-color: #fff;
font-family: Helvetica, Verdana, sans-serif;
- margin: 1em;
+ margin: 0 1em 0 1em;
min-height: 100vh;
display: flex;
flex-direction: column;
text-align: center;
}
h1 {
- margin-top: 1.3em;
- margin-bottom: 2.5em;
+ margin-top: 1em;
+ margin-bottom: 1.25em;
text-align: center;
}
h2 {
background-color: #ddd;
+ padding: .25em 0 .1em 0.25em;
}
main {
flex-grow: 1;
width: 100%;
border-top: 4px dotted #666;
}
-footer nav ol {
+footer ol {
list-style-type: none;
- margin: 0;
+ margin: .5em;
padding: 0;
- border: 1px solid #000;
}
});
}); // _ensureTypes
- describe('schemaCheck', function () {
+ describe('initialize', function () {
let currentSchema;
beforeEach(function () {
currentSchema = {
sinon.stub(db, '_currentSchema').resolves(currentSchema);
});
it('covers success', async function () {
- await db.schemaCheck();
+ await db.initialize();
});
it('covers failure', async function() {
db.schemaVersionsSupported = {
},
};
try {
- await db.schemaCheck();
+ await db.initialize();
assert.fail('did not get expected exception');
} catch (e) {
assert(e instanceof DBErrors.MigrationNeeded);
}
});
- }); // schemaCheck
+ }); // initialize
describe('_topicDefaults', function () {
let topic;
// eslint-disable-next-line security/detect-non-literal-require
DB = require(i.module);
db = new DB(stubLogger, i.config);
- await db.schemaCheck();
+ await db.initialize();
await db._purgeTables(true);
});
after(async function () {
--- /dev/null
+/* eslint-env mocha */
+'use strict';
+
+const assert = require('assert');
+const sinon = require('sinon');
+const stubLogger = require('../../stub-logger');
+const Listener = require('../../../src/db/postgres/listener');
+
+const snooze = async (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+const noExpectedException = 'did not get expected exception';
+
+describe('Postgres Listener', function () {
+ let listener, options, connectionStub, pgpStub;
+ beforeEach(function () {
+ connectionStub = {
+ client: {
+ on: sinon.stub(),
+ removeListener: sinon.stub(),
+ },
+ done: sinon.stub(),
+ none: sinon.stub(),
+ };
+ pgpStub = {
+ connect: sinon.stub().resolves(connectionStub),
+ };
+ options = {
+ dataCallback: sinon.stub(),
+ connectionLostCallback: sinon.stub(),
+ connectionEstablishedCallback: sinon.stub(),
+ pingDelayMs: 100,
+ reconnectDelayMs: 1000,
+ reconnectTimes: 1,
+ };
+ listener = new Listener(stubLogger, pgpStub, options);
+ });
+ afterEach(function () {
+ sinon.restore();
+ });
+
+ describe('start', function () {
+ it('covers', async function () {
+ sinon.stub(listener, '_reconnect').resolves();
+ sinon.stub(listener, '_sendPing').resolves();
+ await listener.start();
+ assert(listener._reconnect.called);
+ assert(listener._sendPing.called);
+ });
+ }); // start
+
+ describe('stop', function () {
+ it('covers not started', async function () {
+ await listener.stop();
+ });
+ it('cancels pending reconnect', async function() {
+ const pendingReconnect = sinon.stub();
+ listener.reconnectPending = setTimeout(pendingReconnect, 100);
+ await listener.stop();
+ snooze(110);
+ assert(!pendingReconnect.called);
+ });
+ it('closes existing connection', async function () {
+ listener.connection = connectionStub;
+ await listener.stop();
+ assert(connectionStub.client.removeListener.called);
+ assert.strictEqual(listener.connection, null);
+ assert(options.connectionLostCallback.called);
+ });
+ }); // stop
+
+ describe('_reconnect', function () {
+ it('reconnects', async function () {
+ await listener._reconnect(0, 1);
+ assert(listener.connection);
+ assert(options.connectionEstablishedCallback.called);
+ });
+ it('closes existing connection before reconnecting', async function () {
+ const existingConnection = {
+ done: sinon.stub(),
+ };
+ listener.connection = existingConnection;
+ await listener._reconnect(0, 1);
+ assert(existingConnection.done.called);
+ });
+ it('overrides a pending reconnect', async function () {
+ this.slow(300);
+ const pendingReconnect = sinon.stub();
+ listener.reconnectPending = setTimeout(pendingReconnect, 100);
+ await listener._reconnect(0, 1);
+ await snooze(110);
+ assert(!pendingReconnect.called);
+ });
+ it('fails with no remaining retries', async function () {
+ const expected = new Error('foo');
+ pgpStub.connect = sinon.stub().rejects(expected);
+ try {
+ await listener._reconnect(0, 0);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ it('fails all remaining retries', async function () {
+ const expected = new Error('foo');
+ pgpStub.connect = sinon.stub().rejects(expected);
+ try {
+ await listener._reconnect(0, 1);
+ assert.fail(noExpectedException);
+ } catch (e) {
+ assert.deepStrictEqual(e, expected);
+ }
+ });
+ it('fails first retry', async function () {
+ const expected = new Error('foo');
+ pgpStub.connect = sinon.stub().onCall(0).rejects(expected).resolves(connectionStub);
+ await listener._reconnect(0, 1);
+ assert(options.connectionEstablishedCallback.called);
+ });
+ }); // _reconnect
+
+ describe('_onConnectionLost', function () {
+ let error, event;
+ beforeEach(function () {
+ error = new Error('blah');
+ event = connectionStub;
+ sinon.stub(listener, '_reconnect');
+ });
+ it('success', async function () {
+ await listener._onConnectionLost(error, event);
+ assert.strictEqual(listener.connection, null);
+ assert(event.client.removeListener.called);
+ assert(listener.options.connectionLostCallback.called);
+ assert(listener._reconnect.called);
+ });
+ it('covers reconnect failure', async function () {
+ listener._reconnect.rejects(error);
+ await listener._onConnectionLost(error, event);
+ assert.strictEqual(listener.connection, null);
+ assert(event.client.removeListener.called);
+ assert(listener.options.connectionLostCallback.called);
+ assert(listener._reconnect.called);
+ });
+ it('covers listener removal failure', async function () {
+ event.client.removeListener.throws(error);
+ await listener._onConnectionLost(error, event);
+ assert.strictEqual(listener.connection, null);
+ assert(event.client.removeListener.called);
+ assert(listener.options.connectionLostCallback.called);
+ assert(listener._reconnect.called);
+ });
+ }); // _onConnectionLost
+
+ describe('_onNotification', function () {
+ it('sends data', async function () {
+ const data = {
+ payload: 'foo',
+ };
+ await listener._onNotification(data);
+ assert(listener.options.dataCallback.called);
+ });
+ it('ignores pings', async function () {
+ const data = {
+ payload: 'ping',
+ };
+ await listener._onNotification(data);
+ assert(!listener.options.dataCallback.called);
+ });
+ }); // _onNotification
+
+ describe('_sendPing', function () {
+ it('covers no connection', async function () {
+ this.slow(300);
+ await listener._sendPing();
+ await snooze(110);
+ clearTimeout(listener.nextPingTimeout);
+ });
+ it('success', async function () {
+ this.slow(300);
+ listener.connection = connectionStub;
+ await listener._sendPing();
+ await snooze(110);
+ clearTimeout(listener.nextPingTimeout);
+ assert(connectionStub.none.called);
+ });
+ it('covers error', async function () {
+ const err = new Error('blah');
+ this.slow(300);
+ listener.connection = connectionStub;
+ listener.connection.none.rejects(err);
+ await listener._sendPing();
+ await snooze(110);
+ clearTimeout(listener.nextPingTimeout);
+ assert(listener.connection.none.called);
+
+ });
+ }); // _sendPing
+
+}); // Postgres Listener
sinon.restore();
});
+ it('covers listener', function () {
+ const listenerOptions = new Config('test');
+ listenerOptions.db.cacheEnabled = true;
+ const listenerDb = new DB(stubLogger, listenerOptions, pgpStub);
+ assert(listenerDb);
+ });
+
// Ensure all interface methods are implemented
describe('Implementation', function () {
it('implements interface', async function () {
db.pgpInitOptions.query(event);
assert(db.logger.debug.called);
});
+ it('covers NOTIFY', function () {
+ const event = { query: 'NOTIFY thing' };
+ db.pgpInitOptions.query(event);
+ assert(!db.logger.debug.called);
+ });
}); // query
describe('receive', function () {
it('covers', function () {
assert(db.logger.debug.called);
assert.deepStrictEqual(data, expectedData);
});
+ it('covers NOTIFY', function () {
+ const data = [
+ {
+ column_one: 'one', // eslint-disable-line camelcase
+ column_two: 2, // eslint-disable-line camelcase
+ },
+ {
+ column_one: 'foo', // eslint-disable-line camelcase
+ column_two: 4, // eslint-disable-line camelcase
+ },
+ ];
+ const result = {
+ command: 'NOTIFY',
+ };
+ const event = {};
+ const expectedData = [
+ {
+ columnOne: 'one',
+ columnTwo: 2,
+ },
+ {
+ columnOne: 'foo',
+ columnTwo: 4,
+ },
+ ];
+ db.pgpInitOptions.receive(data, result, event)
+ assert(!db.logger.debug.called);
+ assert.deepStrictEqual(data, expectedData);
+ });
}); // receive
}); // pgpInitOptions
});
}); // _initTables
- describe('schemaCheck', function () {
+ describe('initialize', function () {
+ after(function () {
+ delete db.listener;
+ });
it('passes supported version', async function () {
const version = { major: 1, minor: 0, patch: 0 };
sinon.stub(db.db, 'one').resolves(version);
- await db.schemaCheck(false);
+ await db.initialize(false);
});
it('fails low version', async function () {
const version = { major: 0, minor: 0, patch: 0 };
sinon.stub(db.db, 'one').resolves(version);
try {
- await db.schemaCheck(false);
+ await db.initialize(false);
assert.fail(noExpectedException);
} catch (e) {
assert(e instanceof DBErrors.MigrationNeeded);
const version = { major: 100, minor: 100, patch: 100 };
sinon.stub(db.db, 'one').resolves(version);
try {
- await db.schemaCheck(false);
+ await db.initialize(false);
assert.fail(noExpectedException);
} catch (e) {
assert(e instanceof DBErrors.MigrationNeeded);
sinon.stub(db.db, 'multiResult');
sinon.stub(db, '_currentSchema').resolves(db.schemaVersionsSupported.max);
sinon.stub(db.db, 'one').resolves(db.schemaVersionsSupported.max);
- await db.schemaCheck();
+ await db.initialize();
});
- }); // schemaCheck
+ it('covers listener', async function() {
+ db.listener = {
+ start: sinon.stub(),
+ };
+ const version = { major: 1, minor: 0, patch: 0 };
+ sinon.stub(db.db, 'one').resolves(version);
+ await db.initialize(false);
+ assert(db.listener.start.called);
+ });
+ }); // initialize
describe('healthCheck', function () {
beforeEach(function () {
}); // _queryFileHelper
describe('_closeConnection', function () {
+ after(function () {
+ delete db.listener;
+ });
it('success', async function () {
sinon.stub(db._pgp, 'end');
await db._closeConnection();
assert.deepStrictEqual(e, expected);
}
});
+ it('covers listener', async function () {
+ db.listener = {
+ stop: sinon.stub(),
+ };
+ sinon.stub(db._pgp, 'end');
+ await db._closeConnection();
+ assert(db._pgp.end.called);
+ });
}); // _closeConnection
describe('_purgeTables', function () {
});
}); // _purgeTables
+ describe('_topicChanged', function () {
+ beforeEach(function () {
+ db.cache = new Map();
+ sinon.stub(db.cache, 'delete');
+ });
+ after(function () {
+ delete db.cache;
+ });
+ it('covers', function () {
+ db._topicChanged('topic-id');
+ assert(db.cache.delete.called);
+ });
+ it('ignores ping', function () {
+ db._topicChanged('ping');
+ assert(!db.cache.delete.called);
+ });
+ }); // _topicChanged
+
+ describe('_listenerEstablished', function () {
+ it('creates cache', function () {
+ delete db.cache;
+ db._listenerEstablished();
+ assert(db.cache instanceof Map);
+ });
+ }); // _listenerEstablished
+
+ describe('_listenerLost', function () {
+ it('removes cache', function () {
+ db.cache = new Map();
+ db._listenerLost();
+ assert(!db.cache);
+ });
+ }); // _listenerLost
+
+ describe('_cacheGet', function () {
+ let key;
+ beforeEach(function () {
+ key = 'key';
+ });
+ it('nothing if no cache', function () {
+ delete db.cache;
+ const result = db._cacheGet(key);
+ assert.strictEqual(result, undefined);
+ });
+ it('nothing if no entry', function () {
+ db.cache = new Map();
+ const result = db._cacheGet(key);
+ assert.strictEqual(result, undefined);
+ });
+ it('returns cached entry', function () {
+ db.cache = new Map();
+ const expected = {
+ foo: 'bar',
+ };
+ db._cacheSet(key, expected);
+ const result = db._cacheGet(key);
+ assert.deepStrictEqual(result, expected);
+ });
+ }); // _cacheGet
+
+ describe('_cacheSet', function () {
+ let key;
+ beforeEach(function () {
+ key = 'key';
+ });
+ it('covers no cache', function () {
+ delete db.cache;
+ db._cacheSet(key, 'data');
+ });
+ it('covers cache', function () {
+ db.cache = new Map();
+ const expected = 'blah';
+ db._cacheSet(key, expected);
+ const result = db._cacheGet(key);
+ assert.deepStrictEqual(result, expected);
+ });
+ }); // _cacheSet
+
describe('context', function () {
it('covers', async function () {
await db.context(common.nop);
}); // topicGetByUrl
describe('topicGetContentById', function () {
+ let topic;
+ beforeEach(function () {
+ delete db.cache;
+ topic = {
+ id: topicId,
+ };
+ });
it('success', async function() {
- const expected = { id: topicId };
+ const expected = topic;
sinon.stub(db.db, 'oneOrNone').resolves(expected);
const result = await db.topicGetContentById(dbCtx, topicId);
assert.deepStrictEqual(result, expected);
assert.deepStrictEqual(e, expected);
}
});
+ it('caches success', async function () {
+ db.cache = new Map();
+ const expected = topic;
+ sinon.stub(db.db, 'oneOrNone').resolves(expected);
+ const result = await db.topicGetContentById(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ });
+ it('covers cached entry', async function() {
+ let result;
+ db.cache = new Map();
+ const expected = topic;
+ sinon.stub(db.db, 'oneOrNone').resolves(expected);
+ result = await db.topicGetContentById(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ result = await db.topicGetContentById(dbCtx, topicId);
+ assert.deepStrictEqual(result, expected);
+ });
}); // topicGetContentById
describe('topicSet', function () {
}
});
- });
+ }); // topicUpdate
describe('verificationClaim', function () {
it('success', async function() {
'authenticationGet',
'authenticationUpsert',
'healthCheck',
- 'schemaCheck',
+ 'initialize',
'subscriptionsByTopicId',
'subscriptionCountByTopicUrl',
'subscriptionDelete',