## [Unreleased]
+### Fixed
+
+- Reverted change introduced in v1.1.3 which consolidated db connections, as it was causing data-integrity issues.
+- Issue with SQLite backend causing admin details about subscriptions to show errors.
+- Verifications will not be attempted until their topics become active, reducing worker activity in certain situations.
+
## [v1.1.4] - 2021-08-16
### Fixed
{
"name": "websub-hub",
- "version": "1.1.4",
+ "version": "1.1.5",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
{
"name": "websub-hub",
- "version": "1.1.4",
+ "version": "1.1.5",
"description": "A WebSub Hub server implementation.",
"main": "server.js",
"scripts": {
throw new Errors.InternalInconsistencyError(verification.mode);
}
- await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId);
+ await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
}); // txCtx
this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
if (wanted > 0) {
// Update topics before anything else.
const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId));
+ topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
inProgress.push(...topicFetchPromises);
wanted -= topicFetchPromises.length;
}
if (wanted > 0) {
// Then any pending verifications.
const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId));
+ verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
inProgress.push(...verificationPromises);
wanted -= verificationPromises.length;
}
if (wanted > 0) {
// Finally dole out content.
const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
- updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId));
+ updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
inProgress.push(...updatePromises);
wanted -= updatePromises.length;
}
max: {
major: 1,
minor: 0,
- patch: 0,
+ patch: 1,
},
};
SELECT *
FROM verification
WHERE
- (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY (topic_id, callback))
+ (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
AND
(topic_id, callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
AND
--- /dev/null
+BEGIN;
+ -- Ignore verifications with topics which are not yet active.
+ CREATE OR REPLACE VIEW verification_needed AS
+ SELECT v.*
+ FROM verification v JOIN topic t ON v.topic_id = t.id
+ WHERE
+ t.is_active
+ AND
+ (v.topic_id, v.callback, v.created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
+ AND
+ (v.topic_id, v.callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
+ AND
+ v.next_attempt <= now()
+ ;
+
+ INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 1);
+COMMIT;
+
--- /dev/null
+BEGIN;
+ CREATE OR REPLACE VIEW verification_needed AS
+ SELECT *
+ FROM verification
+ WHERE
+ (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
+ AND
+ (topic_id, callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
+ AND
+ next_attempt <= now()
+ ;
+
+ DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 1;
+COMMIT;
+
max: {
major: 1,
minor: 0,
- patch: 0,
+ patch: 1,
},
};
}
+ /**
+ * Converts engine subscription fields to native types.
+ * @param {Object} data
+ */
+ static _subscriptionDataToNative(data) {
+ const epochToDate = (epoch) => new Date(Number(epoch) * 1000);
+ if (data) {
+ ['created', 'verified', 'expires', 'contentDelivered'].forEach((field) => {
+ // eslint-disable-next-line security/detect-object-injection
+ data[field] = epochToDate(data[field]);
+ });
+ }
+ return data;
+ }
+
+
subscriptionsByTopicId(dbCtx, topicId) {
const _scope = _fileScope('subscriptionsByTopicId');
this.logger.debug(_scope, 'called', { topicId });
try {
- return this.statement.subscriptionsByTopicId.all({ topicId });
+ const subscriptions = this.statement.subscriptionsByTopicId.all({ topicId });
+ return subscriptions.map((s) => DatabaseSQLite._subscriptionDataToNative(s));
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, topicId });
throw e;
let subscription;
try {
subscription = this.statement.subscriptionGet.get({ callback, topicId });
- return subscription;
+ return DatabaseSQLite._subscriptionDataToNative(subscription);
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, callback, topicId });
throw e;
let subscription;
try {
subscription = this.statement.subscriptionGetById.get({ subscriptionId });
- return subscription;
+ return DatabaseSQLite._subscriptionDataToNative(subscription);
} catch (e) {
this.logger.error(_scope, 'failed', { error: e, subscriptionId });
throw e;
--- /dev/null
+BEGIN;
+ DROP VIEW verification_needed;
+ CREATE VIEW verification_needed AS
+ SELECT v.*
+ FROM verification v JOIN topic t ON v.topic_id = t.id
+ WHERE
+ t.is_active
+ AND
+ (v.topic_id, v.callback, v.created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
+ AND
+ (v.topic_id, v.callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
+ AND
+ v.next_attempt <= (strftime('%s', 'now'))
+ ;
+
+ INSERT INTO _meta_schema_version (major, minor, patch) VALUES (1, 0, 1);
+COMMIT;
+
--- /dev/null
+BEGIN;
+ DROP VIEW verification_needed;
+ CREATE VIEW verification_needed AS
+ SELECT *
+ FROM verification
+ WHERE
+ (topic_id, callback, created) IN (SELECT topic_id, callback, max(created) AS created FROM verification GROUP BY topic_id, callback)
+ AND
+ (topic_id, callback) NOT IN (SELECT topic_id, callback FROM verification_in_progress_active)
+ AND
+ next_attempt <= (strftime('%s', 'now'))
+ ;
+
+ DELETE FROM _meta_schema_version WHERE major = 1 AND minor = 0 AND patch = 1;
+COMMIT;
+
// Interrupt any pending sleep, if we were called out of timeout-cycle.
clearTimeout(this.nextTimeout);
- // Share one db connection for all tasks.
try {
await this.db.context(async (dbCtx) => {
res.setHeader('Link', behavior.selfLink + (behavior.hubLink ? `, ${behavior.hubLink}` : ''));
res.statusCode = behavior.statusCode;
res.end(behavior.content);
- this.logger.info({ method: req.method, statusCode: res.statusCode });
+ this.logger.info({ method: req.method, statusCode: res.statusCode, url: req.url });
}
async putId(req, res, ctx) {
res.statusCode = behavior ? behavior.statusCode : 404;
const response = (behavior && behavior.matchChallenge) ? ctx.queryParams['hub.challenge'] : (behavior && behavior.response);
res.end(response);
- this.logger.info({ method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge) });
+ this.logger.info({ method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge), url: req.url });
}
async postId(req, res, ctx) {
behavior.content = ctx.rawBody;
}
res.end();
- this.logger.info({ content: behavior && behavior.content, method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge) });
+ this.logger.info({ content: behavior && behavior.content, method: req.method, statusCode: res.statusCode, matchChallenge: !!(behavior && behavior.matchChallenge), url: req.url });
}
async putVerify(req, res, ctx) {
}
async deleteId(req, res, ctx) {
- this.setResponseType(this.responseTypes, req, res, ctx);
- this.contentBehaviors.delete(ctx.params.id);
- this.verifyBehaviors.delete(ctx.params.id);
- res.statusCode = 200;
- res.end();
+ this.setResponseType(this.responseTypes, req, res, ctx);
+ this.contentBehaviors.delete(ctx.params.id);
+ this.verifyBehaviors.delete(ctx.params.id);
+ res.statusCode = 200;
+ res.end();
}
} // SubscriberFake
describe('subscriptionsByTopicId', function () {
it('success', async function () {
- const expected = { count: 3 };
+ const expected = [{ id: 3 }];
sinon.stub(db.statement.subscriptionsByTopicId, 'all').returns(expected);
const result = await db.subscriptionsByTopicId(dbCtx, topicUrl);
assert.deepStrictEqual(result, expected);