From: Justin Wind Date: Mon, 23 Aug 2021 22:06:12 +0000 (-0700) Subject: Merge branch 'v1.1-dev' as v1.1.5 X-Git-Tag: v1.1.5 X-Git-Url: http://git.squeep.com/?p=websub-hub;a=commitdiff_plain;h=b2ddc9bc66b20975110561d7b3580ca1f5b9a7ce;hp=ed6dc5a66ce0eaf2dd61f9fb7a5ec048944c68ee Merge branch 'v1.1-dev' as v1.1.5 --- diff --git a/CHANGELOG.md b/CHANGELOG.md index e7fb95b..afd0ba8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ Releases and notable changes to this project are documented here. ## [Unreleased] +### Fixed + +- Reverted change introduced in v1.1.3 which consolidated db connections, as it was causing data-integrity issues. +- Issue with SQLite backend causing admin details about subscriptions to show errors. +- Verifications will not be attempted until their topics become active, reducing worker activity in certain situations. + ## [v1.1.4] - 2021-08-16 ### Fixed diff --git a/package-lock.json b/package-lock.json index 40533b6..c0aa159 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "websub-hub", - "version": "1.1.4", + "version": "1.1.5", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 1b3587f..f6971cd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "websub-hub", - "version": "1.1.4", + "version": "1.1.5", "description": "A WebSub Hub server implementation.", "main": "server.js", "scripts": { diff --git a/src/communication.js b/src/communication.js index b67e2c7..345714f 100644 --- a/src/communication.js +++ b/src/communication.js @@ -340,7 +340,7 @@ class Communication { throw new Errors.InternalInconsistencyError(verification.mode); } - await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId); + await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId); }); // txCtx this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted }); @@ -665,7 +665,7 @@ class Communication { if (wanted > 0) { // Update topics before anything else. const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId)); + topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); inProgress.push(...topicFetchPromises); wanted -= topicFetchPromises.length; } @@ -673,7 +673,7 @@ class Communication { if (wanted > 0) { // Then any pending verifications. const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId)); + verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); inProgress.push(...verificationPromises); wanted -= verificationPromises.length; } @@ -681,7 +681,7 @@ class Communication { if (wanted > 0) { // Finally dole out content. const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId)); + updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); inProgress.push(...updatePromises); wanted -= updatePromises.length; } diff --git a/src/db/postgres/index.js b/src/db/postgres/index.js index 1c5d1d1..2559097 100644 --- a/src/db/postgres/index.js +++ b/src/db/postgres/index.js @@ -30,7 +30,7 @@ const schemaVersionsSupported = { max: { major: 1, minor: 0, - patch: 0, + patch: 1, }, }; diff --git a/src/db/postgres/sql/schema/1.0.0/apply.sql b/src/db/postgres/sql/schema/1.0.0/apply.sql index 2665882..24e955d 100644 --- a/src/db/postgres/sql/schema/1.0.0/apply.sql +++ b/src/db/postgres/sql/schema/1.0.0/apply.sql @@ -174,7 +174,7 @@ BEGIN; SELECT * FROM verification WHERE - (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY (topic_id, callback)) + (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback) AND (topic_id, callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active) AND diff --git a/src/db/postgres/sql/schema/1.0.1/apply.sql b/src/db/postgres/sql/schema/1.0.1/apply.sql new file mode 100644 index 0000000..32eef6a --- /dev/null +++ b/src/db/postgres/sql/schema/1.0.1/apply.sql @@ -0,0 +1,18 @@ +BEGIN; + -- Ignore verifications with topics which are not yet active. + CREATE OR REPLACE VIEW verification_needed AS + SELECT v.* + FROM verification v JOIN topic t ON v.topic_id = t.id + WHERE + t.is_active + AND + (v.topic_id, v.callback, v.created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback) + AND + (v.topic_id, v.callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active) + AND + v.next_attempt <= now() + ; + + INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 1); +COMMIT; + diff --git a/src/db/postgres/sql/schema/1.0.1/revert.sql b/src/db/postgres/sql/schema/1.0.1/revert.sql new file mode 100644 index 0000000..37813c6 --- /dev/null +++ b/src/db/postgres/sql/schema/1.0.1/revert.sql @@ -0,0 +1,15 @@ +BEGIN; + CREATE OR REPLACE VIEW verification_needed AS + SELECT * + FROM verification + WHERE + (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback) + AND + (topic_id, callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active) + AND + next_attempt <= now() + ; + + DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 1; +COMMIT; + diff --git a/src/db/sqlite/index.js b/src/db/sqlite/index.js index a4c3d38..fba4e7c 100644 --- a/src/db/sqlite/index.js +++ b/src/db/sqlite/index.js @@ -20,7 +20,7 @@ const schemaVersionsSupported = { max: { major: 1, minor: 0, - patch: 0, + patch: 1, }, }; @@ -299,12 +299,29 @@ class DatabaseSQLite extends Database { } + /** + * Converts engine subscription fields to native types. + * @param {Object} data + */ + static _subscriptionDataToNative(data) { + const epochToDate = (epoch) => new Date(Number(epoch) * 1000); + if (data) { + ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => { + // eslint-disable-next-line security/detect-object-injection + data[field] = epochToDate(data[field]); + }); + } + return data; + } + + subscriptionsByTopicId(dbCtx, topicId) { const _scope = _fileScope('subscriptionsByTopicId'); this.logger.debug(_scope, 'called', { topicId }); try { - return this.statement.subscriptionsByTopicId.all({ topicId }); + const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId }); + return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s)); } catch (e) { this.logger.error(_scope, 'failed', { error: e, topicId }); throw e; @@ -463,7 +480,7 @@ class DatabaseSQLite extends Database { let subscription; try { subscription = this.statement.subscriptionGet.get({ callback, topicId }); - return subscription; + return DatabaseSQLite._subscriptionDataToNative(subscription); } catch (e) { this.logger.error(_scope, 'failed', { error: e, callback, topicId }); throw e; @@ -478,7 +495,7 @@ class DatabaseSQLite extends Database { let subscription; try { subscription = this.statement.subscriptionGetById.get({ subscriptionId }); - return subscription; + return DatabaseSQLite._subscriptionDataToNative(subscription); } catch (e) { this.logger.error(_scope, 'failed', { error: e, subscriptionId }); throw e; diff --git a/src/db/sqlite/sql/schema/1.0.1/apply.sql b/src/db/sqlite/sql/schema/1.0.1/apply.sql new file mode 100644 index 0000000..6a6552c --- /dev/null +++ b/src/db/sqlite/sql/schema/1.0.1/apply.sql @@ -0,0 +1,18 @@ +BEGIN; + DROP VIEW verification_needed; + CREATE VIEW verification_needed AS + SELECT v.* + FROM verification v JOIN topic t ON v.topic_id = t.id + WHERE + t.is_active + AND + (v.topic_id, v.callback, v.created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback) + AND + (v.topic_id, v.callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active) + AND + v.next_attempt <= (strftime('%s', 'now')) + ; + + INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 1); +COMMIT; + diff --git a/src/db/sqlite/sql/schema/1.0.1/revert.sql b/src/db/sqlite/sql/schema/1.0.1/revert.sql new file mode 100644 index 0000000..bff18bf --- /dev/null +++ b/src/db/sqlite/sql/schema/1.0.1/revert.sql @@ -0,0 +1,16 @@ +BEGIN; + DROP VIEW verification_needed; + CREATE VIEW verification_needed AS + SELECT * + FROM verification + WHERE + (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback) + AND + (topic_id, callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active) + AND + next_attempt <= (strftime('%s', 'now')) + ; + + DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 1; +COMMIT; + diff --git a/src/worker.js b/src/worker.js index 4f566ad..ebf8914 100644 --- a/src/worker.js +++ b/src/worker.js @@ -189,7 +189,6 @@ class Worker { // Interrupt any pending sleep, if we were called out of timeout-cycle. clearTimeout(this.nextTimeout); - // Share one db connection for all tasks. try { await this.db.context(async (dbCtx) => { diff --git a/test-e2e/fake-servers.js b/test-e2e/fake-servers.js index 0adff34..9d46d48 100644 --- a/test-e2e/fake-servers.js +++ b/test-e2e/fake-servers.js @@ -46,7 +46,7 @@ class TopicFake extends Dingus { res.setHeader('Link', behavior.selfLink + (behavior.hubLink ? `, ${behavior.hubLink}` : '')); res.statusCode = behavior.statusCode; res.end(behavior.content); - this.logger.info({ method: req.method, statusCode: res.statusCode }); + this.logger.info({ method: req.method, statusCode: res.statusCode, url: req.url }); } async putId(req, res, ctx) { @@ -99,7 +99,7 @@ class SubscriberFake extends Dingus { res.statusCode = behavior ? behavior.statusCode : 404; const response = (behavior && behavior.matchChallenge) ? ctx.queryParams['hub.challenge'] : (behavior && behavior.response); res.end(response); - this.logger.info({ method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge) }); + this.logger.info({ method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge), url: req.url }); } async postId(req, res, ctx) { @@ -112,7 +112,7 @@ class SubscriberFake extends Dingus { behavior.content = ctx.rawBody; } res.end(); - this.logger.info({ content: behavior && behavior.content, method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge) }); + this.logger.info({ content: behavior && behavior.content, method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge), url: req.url }); } async putVerify(req, res, ctx) { @@ -142,11 +142,11 @@ class SubscriberFake extends Dingus { } async deleteId(req, res, ctx) { - this.setResponseType(this.responseTypes, req, res, ctx); - this.contentBehaviors.delete(ctx.params.id); - this.verifyBehaviors.delete(ctx.params.id); - res.statusCode = 200; - res.end(); + this.setResponseType(this.responseTypes, req, res, ctx); + this.contentBehaviors.delete(ctx.params.id); + this.verifyBehaviors.delete(ctx.params.id); + res.statusCode = 200; + res.end(); } } // SubscriberFake diff --git a/test/src/db/sqlite.js b/test/src/db/sqlite.js index 310e87a..0c96df3 100644 --- a/test/src/db/sqlite.js +++ b/test/src/db/sqlite.js @@ -339,7 +339,7 @@ describe('DatabaseSQLite', function () { describe('subscriptionsByTopicId', function () { it('success', async function () { - const expected = { count: 3 }; + const expected = [{ id: 3 }]; sinon.stub(db.statement.subscriptionsByTopicId, 'all').returns(expected); const result = await db.subscriptionsByTopicId(dbCtx, topicUrl); assert.deepStrictEqual(result, expected);