X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=src%2Fcommunication.js;h=dc4d464c70111b4ea2a8971575940c099d14b379;hb=d5e7908d3e60ee0cb3149163d4749563cdfafeb3;hp=b67e2c731ff122e09f4e68557a8ddf0b1dec927c;hpb=c4d2acfc78cc8b67649c2eaa60a8c6c34c3e6675;p=websub-hub diff --git a/src/communication.js b/src/communication.js index b67e2c7..dc4d464 100644 --- a/src/communication.js +++ b/src/communication.js @@ -57,7 +57,7 @@ class Communication { static userAgentString(userAgentConfig) { // eslint-disable-next-line security/detect-object-injection const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def; - const product = _conf('product', packageName); + const product = _conf('product', packageName).split('/').pop(); const version = _conf('version', packageVersion); let implementation = _conf('implementation', Enum.Specification); if (implementation) { @@ -223,6 +223,7 @@ class Communication { } if (!topic.isActive) { + // These should be filtered out when selecting verification tasks to process. this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId }); await this.db.verificationRelease(dbCtx, verificationId); return; @@ -328,11 +329,19 @@ class Communication { case Enum.Mode.Unsubscribe: if (verificationAccepted) { await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId); + if (topic.isDeleted) { + // Remove a deleted topic after the last subscription is notified. + await this.db.topicPendingDelete(txCtx, topic.id); + } } break; case Enum.Mode.Denied: await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId); + if (topic.isDeleted) { + // Remove a deleted topic after he last subscription is notified. + await this.db.topicPendingDelete(txCtx, topic.id); + } break; default: @@ -340,7 +349,7 @@ class Communication { throw new Errors.InternalInconsistencyError(verification.mode); } - await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId); + await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId); }); // txCtx this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted }); @@ -431,7 +440,10 @@ class Communication { throw new Errors.InternalInconsistencyError('no such topic id'); } - logInfoData.url = topicId.url; + // Cull any expired subscriptions + await this.db.subscriptionDeleteExpired(dbCtx, topicId); + + logInfoData.url = topic.url; if (topic.isDeleted) { this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData); @@ -479,13 +491,15 @@ class Communication { const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data); if (!validHub) { - this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData }); + this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData }); if (this.options.communication.strictTopicHubLink) { await this.db.transaction(dbCtx, async (txCtx) => { // Set as deleted and set content_updated so subscriptions are notified. await this.db.topicDeleted(txCtx, topicId); await this.db.topicFetchComplete(txCtx, topicId); }); + // Attempt to remove from db, if no active subscriptions. + await this.db.topicPendingDelete(dbCtx, topicId); return; } } @@ -551,7 +565,7 @@ class Communication { await this.db.transaction(dbCtx, async (txCtx) => { await this.db.verificationInsert(txCtx, verification); - await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId); + await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated); }); this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData); return; @@ -603,7 +617,7 @@ class Communication { return; } - await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId); + await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated); this.logger.info(_scope, 'update success', logInfoData); } @@ -665,7 +679,7 @@ class Communication { if (wanted > 0) { // Update topics before anything else. const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - topicFetchPromises = topicFetchIds.map((id) => this.topicFetchProcess(dbCtx, id, requestId)); + topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); inProgress.push(...topicFetchPromises); wanted -= topicFetchPromises.length; } @@ -673,7 +687,7 @@ class Communication { if (wanted > 0) { // Then any pending verifications. const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - verificationPromises = verifications.map((id) => this.verificationProcess(dbCtx, id, requestId)); + verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); inProgress.push(...verificationPromises); wanted -= verificationPromises.length; } @@ -681,7 +695,7 @@ class Communication { if (wanted > 0) { // Finally dole out content. const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - updatePromises = updates.map((id) => this.subscriptionDeliveryProcess(dbCtx, id, requestId)); + updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); inProgress.push(...updatePromises); wanted -= updatePromises.length; }