Merge branch 'v1.1-dev' as v1.1.5 v1.1.5
authorJustin Wind <justin.wind+git@gmail.com>
Mon, 23 Aug 2021 22:06:12 +0000 (15:06 -0700)
committerJustin Wind <justin.wind+git@gmail.com>
Mon, 23 Aug 2021 22:06:12 +0000 (15:06 -0700)
14 files changed:
CHANGELOG.md
package-lock.json
package.json
src/communication.js
src/db/postgres/index.js
src/db/postgres/sql/schema/1.0.0/apply.sql
src/db/postgres/sql/schema/1.0.1/apply.sql [new file with mode: 0644]
src/db/postgres/sql/schema/1.0.1/revert.sql [new file with mode: 0644]
src/db/sqlite/index.js
src/db/sqlite/sql/schema/1.0.1/apply.sql [new file with mode: 0644]
src/db/sqlite/sql/schema/1.0.1/revert.sql [new file with mode: 0644]
src/worker.js
test-e2e/fake-servers.js
test/src/db/sqlite.js

index e7fb95b086b3187f83f133aa48ce44d95e4806c2..afd0ba87aeed96446f756361689d3a88b8515392 100644 (file)
@@ -4,6 +4,12 @@ Releases and notable changes to this project are documented here.
 
 ## [Unreleased]
 
+### Fixed
+
+- Reverted change introduced in v1.1.3 which consolidated db connections, as it was causing data-integrity issues.
+- Issue with SQLite backend causing admin details about subscriptions to show errors.
+- Verifications will not be attempted until their topics become active, reducing worker activity in certain situations.
+
 ## [v1.1.4] - 2021-08-16
 
 ### Fixed
index 40533b6b2e29ee80d4dada3bdf4a8ea1cec2bcb4..c0aa159da7addbada1820daa688f02d547e167fd 100644 (file)
@@ -1,6 +1,6 @@
 {
   "name": "websub-hub",
-  "version": "1.1.4",
+  "version": "1.1.5",
   "lockfileVersion": 1,
   "requires": true,
   "dependencies": {
index 1b3587f12db95ec286fe8486a621dfa9dd68bb53..f6971cd517963ac45d82d4e9c9105a5adb6da013 100644 (file)
@@ -1,6 +1,6 @@
 {
   "name": "websub-hub",
-  "version": "1.1.4",
+  "version": "1.1.5",
   "description": "A WebSub Hub server implementation.",
   "main": "server.js",
   "scripts": {
index b67e2c731ff122e09f4e68557a8ddf0b1dec927c..345714f328951f4f7964249a8884882390bb4abc 100644 (file)
@@ -340,7 +340,7 @@ class Communication {
           throw new Errors.InternalInconsistencyError(verification.mode);
       }
 
-      await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId);
+      await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
     }); // txCtx
 
     this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
@@ -665,7 +665,7 @@ class Communication {
       if (wanted > 0) {
         // Update topics before anything else.
         const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-        topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+        topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
         inProgress.push(...topicFetchPromises);
         wanted -= topicFetchPromises.length;
       }
@@ -673,7 +673,7 @@ class Communication {
       if (wanted > 0) {
         // Then any pending verifications.
         const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-        verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+        verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
         inProgress.push(...verificationPromises);
         wanted -= verificationPromises.length;
       }
@@ -681,7 +681,7 @@ class Communication {
       if (wanted > 0) {
         // Finally dole out content.
         const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-        updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+        updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
         inProgress.push(...updatePromises);
         wanted -= updatePromises.length;
       }
index 1c5d1d1c4f2253a79c3053232781f2bbb7e45f56..255909780fe71c41133fc4b29c30eb5ec563f27a 100644 (file)
@@ -30,7 +30,7 @@ const schemaVersionsSupported = {
   max: {
     major: 1,
     minor: 0,
-    patch: 0,
+    patch: 1,
   },
 };
 
index 2665882108cef64db73e9293442edac259020cff..24e955dbfc0c26423e998eff3870067164cecf89 100644 (file)
@@ -174,7 +174,7 @@ BEGIN;
                SELECT *
                FROM verification
                WHERE
-                       (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY (topic_id, callback))
+                       (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
                AND
                        (topic_id, callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
                AND
diff --git a/src/db/postgres/sql/schema/1.0.1/apply.sql b/src/db/postgres/sql/schema/1.0.1/apply.sql
new file mode 100644 (file)
index 0000000..32eef6a
--- /dev/null
@@ -0,0 +1,18 @@
+BEGIN;
+       -- Ignore verifications with topics which are not yet active.
+       CREATE OR REPLACE VIEW verification_needed AS
+               SELECT v.*
+               FROM verification v JOIN topic t ON v.topic_id = t.id
+               WHERE
+                       t.is_active
+               AND
+                       (v.topic_id, v.callback, v.created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
+               AND
+                       (v.topic_id, v.callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
+               AND
+                       v.next_attempt <= now()
+       ;
+
+       INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 1);
+COMMIT;
+
diff --git a/src/db/postgres/sql/schema/1.0.1/revert.sql b/src/db/postgres/sql/schema/1.0.1/revert.sql
new file mode 100644 (file)
index 0000000..37813c6
--- /dev/null
@@ -0,0 +1,15 @@
+BEGIN;
+       CREATE OR REPLACE VIEW verification_needed AS
+               SELECT *
+               FROM verification
+               WHERE
+                       (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
+               AND
+                       (topic_id, callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
+               AND
+                       next_attempt <= now()
+       ;
+
+       DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 1;
+COMMIT;
+
index a4c3d38fa0f252b94982f5b12832a4bb0815dbb7..fba4e7ca326f828a6f9698258d2ec4628fb7665e 100644 (file)
@@ -20,7 +20,7 @@ const schemaVersionsSupported = {
   max: {
     major: 1,
     minor: 0,
-    patch: 0,
+    patch: 1,
   },
 };
 
@@ -299,12 +299,29 @@ class DatabaseSQLite extends Database {
   }
 
 
+  /**
+   * Converts engine subscription fields to native types.
+   * @param {Object} data
+   */
+  static _subscriptionDataToNative(data) {
+    const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
+    if (data) {
+      ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
+        // eslint-disable-next-line security/detect-object-injection
+        data[field] = epochToDate(data[field]);
+      });
+    }
+    return data;
+  }
+
+
   subscriptionsByTopicId(dbCtx, topicId) {
     const _scope = _fileScope('subscriptionsByTopicId');
     this.logger.debug(_scope, 'called', { topicId });
 
     try {
-      return this.statement.subscriptionsByTopicId.all({ topicId });
+      const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
+      return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, topicId });
       throw e;
@@ -463,7 +480,7 @@ class DatabaseSQLite extends Database {
     let subscription;
     try {
       subscription = this.statement.subscriptionGet.get({ callback, topicId });
-      return subscription;
+      return DatabaseSQLite._subscriptionDataToNative(subscription);
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, callback, topicId });
       throw e;
@@ -478,7 +495,7 @@ class DatabaseSQLite extends Database {
     let subscription;
     try {
       subscription = this.statement.subscriptionGetById.get({ subscriptionId });
-      return subscription;
+      return DatabaseSQLite._subscriptionDataToNative(subscription);
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e, subscriptionId });
       throw e;
diff --git a/src/db/sqlite/sql/schema/1.0.1/apply.sql b/src/db/sqlite/sql/schema/1.0.1/apply.sql
new file mode 100644 (file)
index 0000000..6a6552c
--- /dev/null
@@ -0,0 +1,18 @@
+BEGIN;
+       DROP VIEW verification_needed;
+       CREATE VIEW verification_needed AS
+               SELECT v.*
+               FROM verification v JOIN topic t ON v.topic_id = t.id
+               WHERE
+                       t.is_active
+               AND
+                       (v.topic_id, v.callback, v.created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
+               AND
+                       (v.topic_id, v.callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
+               AND
+                       v.next_attempt <= (strftime('%s', 'now'))
+       ;
+
+       INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 1);
+COMMIT;
+
diff --git a/src/db/sqlite/sql/schema/1.0.1/revert.sql b/src/db/sqlite/sql/schema/1.0.1/revert.sql
new file mode 100644 (file)
index 0000000..bff18bf
--- /dev/null
@@ -0,0 +1,16 @@
+BEGIN;
+       DROP VIEW verification_needed;
+       CREATE VIEW verification_needed AS
+               SELECT *
+               FROM verification
+               WHERE
+                       (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
+               AND
+                       (topic_id, callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
+               AND
+                       next_attempt <= (strftime('%s', 'now'))
+       ;
+
+       DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 1;
+COMMIT;
+
index 4f566ad9f97209bcc538692cf87b6e412ccbe877..ebf891405124e730881b96c1b69d4bf4593be689 100644 (file)
@@ -189,7 +189,6 @@ class Worker {
     // Interrupt any pending sleep, if we were called out of timeout-cycle.
     clearTimeout(this.nextTimeout);
 
-    // Share one db connection for all tasks.
     try {
       await this.db.context(async (dbCtx) => {
 
index 0adff34edadff9d0a1ae7c61cde230a9e8f8d8c3..9d46d485edca863ef8608dd4989f39044a56379c 100644 (file)
@@ -46,7 +46,7 @@ class TopicFake extends Dingus {
     res.setHeader('Link', behavior.selfLink + (behavior.hubLink ? `, ${behavior.hubLink}` : ''));
     res.statusCode = behavior.statusCode;
     res.end(behavior.content);
-    this.logger.info({ method: req.method, statusCode: res.statusCode });
+    this.logger.info({ method: req.method, statusCode: res.statusCode, url: req.url });
   }
 
   async putId(req, res, ctx) {
@@ -99,7 +99,7 @@ class SubscriberFake extends Dingus {
     res.statusCode = behavior ? behavior.statusCode : 404;
     const response = (behavior && behavior.matchChallenge) ? ctx.queryParams['hub.challenge'] : (behavior && behavior.response);
     res.end(response);
-    this.logger.info({ method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge) });
+    this.logger.info({ method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge), url: req.url });
   }
 
   async postId(req, res, ctx) {
@@ -112,7 +112,7 @@ class SubscriberFake extends Dingus {
       behavior.content = ctx.rawBody;
     }
     res.end();
-    this.logger.info({ content: behavior && behavior.content, method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge) });
+    this.logger.info({ content: behavior && behavior.content, method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge), url: req.url });
   }
 
   async putVerify(req, res, ctx) {
@@ -142,11 +142,11 @@ class SubscriberFake extends Dingus {
   }
 
   async deleteId(req, res, ctx) {
-      this.setResponseType(this.responseTypes, req, res, ctx);
-      this.contentBehaviors.delete(ctx.params.id);
-      this.verifyBehaviors.delete(ctx.params.id);
-      res.statusCode = 200;
-      res.end();
+    this.setResponseType(this.responseTypes, req, res, ctx);
+    this.contentBehaviors.delete(ctx.params.id);
+    this.verifyBehaviors.delete(ctx.params.id);
+    res.statusCode = 200;
+    res.end();
   }
 
 } // SubscriberFake
index 310e87ad3da7e1203b50de9f2ad3f1d19ed34405..0c96df364d1d4d5ecfb98b6de6e2a021a3c92a05 100644 (file)
@@ -339,7 +339,7 @@ describe('DatabaseSQLite', function () {
 
   describe('subscriptionsByTopicId', function () {
     it('success', async function () {
-      const expected = { count: 3 };
+      const expected = [{ id: 3 }];
       sinon.stub(db.statement.subscriptionsByTopicId, 'all').returns(expected);
       const result = await db.subscriptionsByTopicId(dbCtx, topicUrl);
       assert.deepStrictEqual(result, expected);