database migration 1.0.4, store topic fetch etag/last-modified, provide these when...
[websub-hub] / src / communication.js
index 72f2642a003e465aab42909ce13f90b8fd93694c..3b436786fabf3ebfddfb245b479da669894f8377 100644 (file)
@@ -49,7 +49,7 @@ class Communication {
       return response;
     });
 
-    this.worker = new Worker(logger, this.workFeed.bind(this), options);
+    this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
     this.worker.start();
   }
 
@@ -57,7 +57,7 @@ class Communication {
   static userAgentString(userAgentConfig) {
     // eslint-disable-next-line security/detect-object-injection
     const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def;
-    const product = _conf('product', packageName);
+    const product = _conf('product', packageName).split('/').pop();
     const version = _conf('version', packageVersion);
     let implementation = _conf('implementation', Enum.Specification);
     if (implementation) {
@@ -196,6 +196,8 @@ class Communication {
     const acceptPreferred = [topic.contentType, acceptWildcard].filter((x) => x).join(', ');
     return Communication._axiosConfig('GET', topic.url, undefined, {}, {
       [Enum.Header.Accept]: acceptPreferred,
+      ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }),
+      ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }),
     });
   }
 
@@ -223,6 +225,7 @@ class Communication {
     }
 
     if (!topic.isActive) {
+      // These should be filtered out when selecting verification tasks to process.
       this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
       await this.db.verificationRelease(dbCtx, verificationId);
       return;
@@ -328,11 +331,19 @@ class Communication {
         case Enum.Mode.Unsubscribe:
           if (verificationAccepted) {
             await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+            if (topic.isDeleted) {
+              // Remove a deleted topic after the last subscription is notified.
+              await this.db.topicPendingDelete(txCtx, topic.id);
+            }
           }
           break;
 
         case Enum.Mode.Denied:
           await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
+          if (topic.isDeleted) {
+            // Remove a deleted topic after he last subscription is notified.
+            await this.db.topicPendingDelete(txCtx, topic.id);
+          }
           break;
 
         default:
@@ -340,7 +351,7 @@ class Communication {
           throw new Errors.InternalInconsistencyError(verification.mode);
       }
 
-      await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId);
+      await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
     }); // txCtx
 
     this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
@@ -431,7 +442,10 @@ class Communication {
       throw new Errors.InternalInconsistencyError('no such topic id');
     }
 
-    logInfoData.url = topicId.url;
+    // Cull any expired subscriptions
+    await this.db.subscriptionDeleteExpired(dbCtx, topicId);
+
+    logInfoData.url = topic.url;
 
     if (topic.isDeleted) {
       this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData);
@@ -455,6 +469,7 @@ class Communication {
 
     switch (common.httpStatusCodeClass(response.status)) {
       case 2:
+      case 3:
         // Fall out of switch on success
         break;
 
@@ -469,6 +484,12 @@ class Communication {
         return;
     }
 
+    if (response.status === 304) {
+      this.logger.info(_scope, 'content has not changed, per server', logInfoData);
+      await this.db.topicFetchComplete(dbCtx, topicId);
+      return;
+    }
+
     const contentHash = Communication.contentHash(response.data, topic.contentHashAlgorithm);
     logInfoData.contentHash = contentHash;
     if (topic.contentHash === contentHash) {
@@ -479,18 +500,22 @@ class Communication {
 
     const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
     if (!validHub) {
-      this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData });
+      this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
       if (this.options.communication.strictTopicHubLink) {
         await this.db.transaction(dbCtx, async (txCtx) => {
           // Set as deleted and set content_updated so subscriptions are notified.
           await this.db.topicDeleted(txCtx, topicId);
           await this.db.topicFetchComplete(txCtx, topicId);
         });
+        // Attempt to remove from db, if no active subscriptions.
+        await this.db.topicPendingDelete(dbCtx, topicId);
         return;
       }
     }
 
     const contentType = response.headers[Enum.Header.ContentType.toLowerCase()];
+    const httpETag = response.headers[Enum.Header.ETag.toLowerCase()];
+    const httpLastModified = response.headers[Enum.Header.LastModified.toLowerCase()];
 
     await this.db.transaction(dbCtx, async (txCtx) => {
       await this.db.topicSetContent(txCtx, {
@@ -498,6 +523,8 @@ class Communication {
         content: Buffer.from(response.data),
         contentHash,
         ...(contentType && { contentType }),
+        ...(httpETag && { httpETag }),
+        ...(httpLastModified && { httpLastModified }),
       });
 
       await this.db.topicFetchComplete(txCtx, topicId);
@@ -551,7 +578,7 @@ class Communication {
 
       await this.db.transaction(dbCtx, async (txCtx) => {
         await this.db.verificationInsert(txCtx, verification);
-        await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId);
+        await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
       });
       this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
       return;
@@ -603,7 +630,7 @@ class Communication {
         return;
     }
 
-    await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId);
+    await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
     this.logger.info(_scope, 'update success', logInfoData);
   }
 
@@ -647,10 +674,11 @@ class Communication {
 
   /**
    * 
+   * @param {*} dbCtx
    * @param {Number} wanted maximum tasks to claim
    * @returns {Promise<void>[]}
    */
-  async workFeed(wanted) {
+  async workFeed(dbCtx, wanted) {
     const _scope = _fileScope('workFeed');
     const inProgress = [];
     const requestId = common.requestId();
@@ -661,33 +689,29 @@ class Communication {
     this.logger.debug(_scope, 'called', { wanted });
 
     try {
-      await this.db.context(async (dbCtx) => {
-        if (wanted > 0) {
-          // Update topics before anything else.
-          const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-          // Each task gets a new context, as these map to connections in some dbs.
-          // This dbCtx goes away after launching the processing functions, so would not be available to tasks.
-          topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
-          inProgress.push(...topicFetchPromises);
-          wanted -= topicFetchPromises.length;
-        }
+      if (wanted > 0) {
+        // Update topics before anything else.
+        const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+        topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
+        inProgress.push(...topicFetchPromises);
+        wanted -= topicFetchPromises.length;
+      }
 
-        if (wanted > 0) {
-          // Then any pending verifications.
-          const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-          verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
-          inProgress.push(...verificationPromises);
-          wanted -= verificationPromises.length;
-        }
+      if (wanted > 0) {
+        // Then any pending verifications.
+        const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+        verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
+        inProgress.push(...verificationPromises);
+        wanted -= verificationPromises.length;
+      }
 
-        if (wanted > 0) {
-          // Finally dole out content.
-          const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
-          updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
-          inProgress.push(...updatePromises);
-          wanted -= updatePromises.length;
-        }
-      }); // dbCtx
+      if (wanted > 0) {
+        // Finally dole out content.
+        const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
+        updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
+        inProgress.push(...updatePromises);
+        wanted -= updatePromises.length;
+      }
     } catch (e) {
       this.logger.error(_scope, 'failed', { error: e });
       // do not re-throw, return what we've claimed so far