X-Git-Url: https://git.squeep.com/?a=blobdiff_plain;f=src%2Fcommunication.js;h=feda5887146c6b70ae242e1daf52ab3fbbe10a44;hb=43898cdd317a127bc45e8b3cb2f160df386760a1;hp=72f2642a003e465aab42909ce13f90b8fd93694c;hpb=9696c012e6b9a6c58904baa397ca0ebf78112316;p=websub-hub diff --git a/src/communication.js b/src/communication.js index 72f2642..feda588 100644 --- a/src/communication.js +++ b/src/communication.js @@ -49,7 +49,7 @@ class Communication { return response; }); - this.worker = new Worker(logger, this.workFeed.bind(this), options); + this.worker = new Worker(logger, db, this.workFeed.bind(this), options); this.worker.start(); } @@ -57,7 +57,7 @@ class Communication { static userAgentString(userAgentConfig) { // eslint-disable-next-line security/detect-object-injection const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def; - const product = _conf('product', packageName); + const product = _conf('product', packageName).split('/').pop(); const version = _conf('version', packageVersion); let implementation = _conf('implementation', Enum.Specification); if (implementation) { @@ -196,6 +196,8 @@ class Communication { const acceptPreferred = [topic.contentType, acceptWildcard].filter((x) => x).join(', '); return Communication._axiosConfig('GET', topic.url, undefined, {}, { [Enum.Header.Accept]: acceptPreferred, + ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }), + ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }), }); } @@ -218,11 +220,12 @@ class Communication { const topic = await this.db.topicGetById(dbCtx, verification.topicId); if (!topic) { - this.logger.error(_scope, 'no such topic id', { verification, requestId }); - throw new Errors.InternalInconsistencyError('no such topic id'); + this.logger.error(_scope, Enum.Message.NoSuchTopicId, { verification, requestId }); + throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId); } if (!topic.isActive) { + // These should be filtered out when selecting verification tasks to process. this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId }); await this.db.verificationRelease(dbCtx, verificationId); return; @@ -328,11 +331,19 @@ class Communication { case Enum.Mode.Unsubscribe: if (verificationAccepted) { await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId); + if (topic.isDeleted) { + // Remove a deleted topic after the last subscription is notified. + await this.db.topicPendingDelete(txCtx, topic.id); + } } break; case Enum.Mode.Denied: await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId); + if (topic.isDeleted) { + // Remove a deleted topic after he last subscription is notified. + await this.db.topicPendingDelete(txCtx, topic.id); + } break; default: @@ -340,7 +351,7 @@ class Communication { throw new Errors.InternalInconsistencyError(verification.mode); } - await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId); + await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId); }); // txCtx this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted }); @@ -427,11 +438,14 @@ class Communication { const topic = await this.db.topicGetById(dbCtx, topicId); if (topic === undefined) { - this.logger.error(_scope, 'no such topic id', logInfoData); - throw new Errors.InternalInconsistencyError('no such topic id'); + this.logger.error(_scope, Enum.Message.NoSuchTopicId, logInfoData); + throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId); } - logInfoData.url = topicId.url; + // Cull any expired subscriptions + await this.db.subscriptionDeleteExpired(dbCtx, topicId); + + logInfoData.url = topic.url; if (topic.isDeleted) { this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData); @@ -455,6 +469,7 @@ class Communication { switch (common.httpStatusCodeClass(response.status)) { case 2: + case 3: // Fall out of switch on success break; @@ -469,6 +484,12 @@ class Communication { return; } + if (response.status === 304) { + this.logger.info(_scope, 'content has not changed, per server', logInfoData); + await this.db.topicFetchComplete(dbCtx, topicId); + return; + } + const contentHash = Communication.contentHash(response.data, topic.contentHashAlgorithm); logInfoData.contentHash = contentHash; if (topic.contentHash === contentHash) { @@ -479,18 +500,22 @@ class Communication { const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data); if (!validHub) { - this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData }); + this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData }); if (this.options.communication.strictTopicHubLink) { await this.db.transaction(dbCtx, async (txCtx) => { // Set as deleted and set content_updated so subscriptions are notified. await this.db.topicDeleted(txCtx, topicId); await this.db.topicFetchComplete(txCtx, topicId); }); + // Attempt to remove from db, if no active subscriptions. + await this.db.topicPendingDelete(dbCtx, topicId); return; } } const contentType = response.headers[Enum.Header.ContentType.toLowerCase()]; + const httpETag = response.headers[Enum.Header.ETag.toLowerCase()]; + const httpLastModified = response.headers[Enum.Header.LastModified.toLowerCase()]; await this.db.transaction(dbCtx, async (txCtx) => { await this.db.topicSetContent(txCtx, { @@ -498,6 +523,8 @@ class Communication { content: Buffer.from(response.data), contentHash, ...(contentType && { contentType }), + ...(httpETag && { httpETag }), + ...(httpLastModified && { httpLastModified }), }); await this.db.topicFetchComplete(txCtx, topicId); @@ -551,7 +578,7 @@ class Communication { await this.db.transaction(dbCtx, async (txCtx) => { await this.db.verificationInsert(txCtx, verification); - await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId); + await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated); }); this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData); return; @@ -603,7 +630,7 @@ class Communication { return; } - await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId); + await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated); this.logger.info(_scope, 'update success', logInfoData); } @@ -647,10 +674,11 @@ class Communication { /** * + * @param {*} dbCtx * @param {Number} wanted maximum tasks to claim * @returns {Promise[]} */ - async workFeed(wanted) { + async workFeed(dbCtx, wanted) { const _scope = _fileScope('workFeed'); const inProgress = []; const requestId = common.requestId(); @@ -661,33 +689,29 @@ class Communication { this.logger.debug(_scope, 'called', { wanted }); try { - await this.db.context(async (dbCtx) => { - if (wanted > 0) { - // Update topics before anything else. - const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - // Each task gets a new context, as these map to connections in some dbs. - // This dbCtx goes away after launching the processing functions, so would not be available to tasks. - topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); - inProgress.push(...topicFetchPromises); - wanted -= topicFetchPromises.length; - } + if (wanted > 0) { + // Update topics before anything else. + const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId))); + inProgress.push(...topicFetchPromises); + wanted -= topicFetchPromises.length; + } - if (wanted > 0) { - // Then any pending verifications. - const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); - inProgress.push(...verificationPromises); - wanted -= verificationPromises.length; - } + if (wanted > 0) { + // Then any pending verifications. + const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId))); + inProgress.push(...verificationPromises); + wanted -= verificationPromises.length; + } - if (wanted > 0) { - // Finally dole out content. - const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); - updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); - inProgress.push(...updatePromises); - wanted -= updatePromises.length; - } - }); // dbCtx + if (wanted > 0) { + // Finally dole out content. + const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId); + updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId))); + inProgress.push(...updatePromises); + wanted -= updatePromises.length; + } } catch (e) { this.logger.error(_scope, 'failed', { error: e }); // do not re-throw, return what we've claimed so far