use got instead of axios, some cleanup, problem with async context being lost for...
[websub-hub] / src / communication.js
index feda5887146c6b70ae242e1daf52ab3fbbe10a44..c0d08fde1f37509538c7bbc21efaff78948b46d4 100644 (file)
@@ -5,7 +5,6 @@
  * worker which initiates most of them.
  */
 
-const axios = require('axios');
 const common = require('./common');
 const crypto = require('crypto');
 const Enum = require('./enum');
@@ -14,8 +13,6 @@ const Worker = require('./worker');
 const LinkHelper = require('./link-helper');
 const { version: packageVersion, name: packageName } = require('../package.json'); // For default UA string
 
-const { performance } = require('perf_hooks');
-
 const _fileScope = common.fileScope(__filename);
 
 class Communication {
@@ -32,28 +29,68 @@ class Communication {
       this.logger.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
     }
 
-    // Set common options
-    this.axios = axios.create({
-      validateStatus: null, // Non-success responses are not exceptional
-      headers: {
-        [Enum.Header.UserAgent]: Communication.userAgentString(options.userAgent),
-      },
-    });
-
-    this.axios.interceptors.request.use((request) => {
-      request.startTimestampMs = performance.now();
-      return request;
-    });
-    this.axios.interceptors.response.use((response) => {
-      response.elapsedTimeMs = performance.now() - response.config.startTimestampMs;
-      return response;
-    });
+    this.Got = undefined; // Will become the async imported got.
+    this.got = this._init; // First invocation imports got and replaces this.
 
     this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
     this.worker.start();
   }
 
 
+  /**
+   * Do a little dance to cope with ESM dynamic import.
+   * @param  {...any} args
+   * @returns {Promise<any>}
+   */
+  async _init(...args) {
+    if (!this.Got) {
+      // For some reason eslint is confused about import being supported here.
+      // eslint-disable-next-line
+      this.Got = await import('got');
+      this.got = this.Got.got.extend({
+        followRedirect: false, // Outgoing API calls should not encounter redirects
+        throwHttpErrors: false, // We will be checking status codes explicitly
+        headers: {
+          [Enum.Header.UserAgent]: Communication.userAgentString(this.options.userAgent),
+        },
+        timeout: {
+          request: this.options.communication.requestTimeoutMs || 120000,
+        },
+        hooks: {
+          beforeRetry: [
+            this._onRetry,
+          ],
+        },
+      });
+    }
+
+    /* istanbul ignore if */
+    if (args.length) {
+      /* istanbul ignore next */
+      return this.got(...args);
+    }
+  }
+
+
+  /**
+   * Take note of transient retries.
+   * @param {*} error
+   * @param {*} retryCount
+   */
+  _onRetry(error, retryCount) {
+    const _scope = _fileScope('_onRetry');
+    this.logger.debug(_scope, 'retry', { retryCount, error });
+  }
+
+
+  /**
+   * Construct a user-agent value.
+   * @param {Object} userAgentConfig
+   * @param {String=} userAgentConfig.product
+   * @param {String=} userAgentConfig.version
+   * @param {String=} userAgentConfig.implementation
+   * @returns {String}
+   */
   static userAgentString(userAgentConfig) {
     // eslint-disable-next-line security/detect-object-injection
     const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def;
@@ -70,7 +107,7 @@ class Communication {
   /**
    * Generate a random string.
    * @param {Integer} bytes 
-   * @returns {String}
+   * @returns {Promise<String>}
    */
   static async generateChallenge(bytes = 30) {
     return (await common.randomBytesAsync(bytes)).toString('base64');
@@ -85,9 +122,10 @@ class Communication {
    * @returns {String}
    */
   static signature(message, secret, algorithm) {
-    const hmac = crypto.createHmac(algorithm, secret);
-    hmac.update(message);
-    return `${algorithm}=${hmac.digest('hex')}`;
+    const hmac = crypto.createHmac(algorithm, secret)
+      .update(message)
+      .digest('hex');
+    return `${algorithm}=${hmac}`;
   }
 
 
@@ -95,110 +133,12 @@ class Communication {
    * Generate the hash for content.
    * @param {Buffer} content 
    * @param {String} algorithm 
-   * @returns 
-   */
-  static contentHash(content, algorithm) {
-    const hash = crypto.createHash(algorithm);
-    hash.update(content);
-    return hash.digest('hex');
-  }
-
-
-  /**
-   * A request skeleton config.
-   * @param {String} method
-   * @param {String} requestUrl
-   * @param {String} body
-   * @param {Object} params
-   */
-  static _axiosConfig(method, requestUrl, body, params = {}, headers = {}) {
-    const urlObj = new URL(requestUrl);
-    const config = {
-      method,
-      url: `${urlObj.origin}${urlObj.pathname}`,
-      params: urlObj.searchParams,
-      headers,
-      ...(body && { data: body }),
-      // Setting this does not appear to be enough to keep axios from parsing JSON response into object
-      responseType: 'text',
-      // So force the matter by eliding all response transformations
-      transformResponse: [ (res) => res ],
-    };
-    Object.entries(params).map(([k, v]) => config.params.set(k, v));
-    return config;
-  }
-
-
-  /**
-   * Create request config for verifying an intent.
-   * @param {URL} requestUrl
-   * @param {String} topicUrl
-   * @param {String} mode
-   * @param {Integer} leaseSeconds
-   * @param {String} challenge
-   */
-  static _intentVerifyAxiosConfig(requestUrl, topicUrl, mode, leaseSeconds, challenge) {
-    // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
-    leaseSeconds = leaseSeconds.toString();
-
-    return Communication._axiosConfig('GET', requestUrl, undefined, {
-      'hub.mode': mode,
-      'hub.topic': topicUrl,
-      'hub.challenge': challenge,
-      'hub.lease_seconds': leaseSeconds,
-    }, {});
-  }
-
-
-  /**
-   * Create request config for denying an intent.
-   * @param {String} requestUrl 
-   * @param {String} topicUrl 
-   * @param {String} reason 
-   * @returns {String}
-   */
-  static _intentDenyAxiosConfig(requestUrl, topicUrl, reason) {
-    return Communication._axiosConfig('GET', requestUrl, undefined, {
-      'hub.mode': Enum.Mode.Denied,
-      'hub.topic': topicUrl,
-      ...(reason && { 'hub.reason': reason }),
-    }, {});
-  }
-
-
-  /**
-   * Create request config for querying publisher for subscription validation.
-   * @param {Topic} topic 
-   * @param {Verification} verification 
-   * @returns {String}
-   */
-  static _publisherValidationAxiosConfig(topic, verification) {
-    const body = {
-      callback: verification.callback,
-      topic: topic.url,
-      ...(verification.httpFrom && { from: verification.httpFrom }),
-      ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }),
-    };
-    return Communication._axiosConfig('POST', topic.publisherValidationUrl, body, {}, {
-      [Enum.Header.ContentType]: Enum.ContentType.ApplicationJson,
-    });
-  }
-
-
-  /**
-   * Create request config for fetching topic content.
-   * Prefer existing content-type, but accept anything.
-   * @param {Topic} topic 
    * @returns {String}
    */
-  static _topicFetchAxiosConfig(topic) {
-    const acceptWildcard = '*/*' + (topic.contentType ? ';q=0.9' : '');
-    const acceptPreferred = [topic.contentType, acceptWildcard].filter((x) => x).join(', ');
-    return Communication._axiosConfig('GET', topic.url, undefined, {}, {
-      [Enum.Header.Accept]: acceptPreferred,
-      ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }),
-      ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }),
-    });
+  static contentHash(content, algorithm) {
+    return crypto.createHash(algorithm)
+      .update(content)
+      .digest('hex');
   }
 
 
@@ -256,19 +196,37 @@ class Communication {
       }
     }
 
-    const u = new URL(verification.callback);
-    let callbackRequestConfig, challenge;
+    const callbackRequestConfig = {
+      method: 'GET',
+      url: new URL(verification.callback),
+      responseType: 'text',
+    };
+    const callbackParams = {
+      'hub.topic': topic.url,
+      'hub.mode': verification.mode,
+    };
+
+    let challenge;
     if (verification.mode === Enum.Mode.Denied) {
-      // Denials don't have a challenge.
-      callbackRequestConfig = Communication._intentDenyAxiosConfig(u, topic.url, verification.reason);
+      // Denials don't have a challenge, but might have a reason.
+      if (verification.reason) {
+        callbackParams['hub.reason'] = verification.reason;
+      }
     } else {
       // Subscriptions and unsubscriptions require challenge matching.
       challenge = await Communication.generateChallenge();
-      callbackRequestConfig = Communication._intentVerifyAxiosConfig(u, topic.url, verification.mode, verification.leaseSeconds, challenge);
+      Object.assign(callbackParams, {
+        'hub.challenge': challenge,
+        // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
+        'hub.lease_seconds': verification.leaseSeconds.toString(),
+      });
     }
+    Object.entries(callbackParams)
+      .forEach(([k, v]) => callbackRequestConfig.url.searchParams.set(k, v))
+    ;
 
     const logInfoData = {
-      callbackUrl: u.href,
+      callbackUrl: callbackRequestConfig.url.href,
       topicUrl: topic.url,
       mode: verification.mode,
       originalRequestId: verification.requestId,
@@ -280,18 +238,18 @@ class Communication {
 
     let response;
     try {
-      response = await this.axios(callbackRequestConfig);
+      response = await this.got(callbackRequestConfig);
     } catch (e) {
       this.logger.error(_scope, 'verification request failed', { ...logInfoData, error: e });
       await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
       return;
     }
-    logInfoData.response = common.axiosResponseLogData(response);
+    logInfoData.response = common.gotResponseLogData(response);
     this.logger.debug(_scope, 'verification response', logInfoData );
 
     let verificationAccepted = true; // Presume success.
 
-    switch (common.httpStatusCodeClass(response.status)) {
+    switch (common.httpStatusCodeClass(response.statusCode)) {
       case 2:
         // Success, fall out of switch.
         break;
@@ -315,7 +273,7 @@ class Communication {
     }
 
     if ([Enum.Mode.Subscribe, Enum.Mode.Unsubscribe].includes(verification.mode)
-    &&  response.data !== challenge) {
+    &&  response.body !== challenge) {
       this.logger.info(_scope, 'verification rejected by challenge', logInfoData);
       verificationAccepted = false;
     }
@@ -341,7 +299,7 @@ class Communication {
         case Enum.Mode.Denied:
           await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
           if (topic.isDeleted) {
-            // Remove a deleted topic after he last subscription is notified.
+            // Remove a deleted topic after the last subscription is notified.
             await this.db.topicPendingDelete(txCtx, topic.id);
           }
           break;
@@ -363,6 +321,8 @@ class Communication {
    * Updates (and persists) verification.
    * Returns boolean of status of publisher contact, and hence
    * whether to continue verification with client.
+   *
+   * This is not defined by the spec.  We opt to speak JSON here.
    * @param {*} dbCtx
    * @param {TopicData} topic
    * @param {VerificationData} verification
@@ -370,7 +330,6 @@ class Communication {
    */
   async publisherValidate(dbCtx, topic, verification) {
     const _scope = _fileScope('publisherValidate');
-    const publisherValidationRequestConfig = Communication._publisherValidationAxiosConfig(topic, verification);
     const logInfoData = {
       topicUrl: topic.url,
       callbackUrl: verification.callback,
@@ -380,18 +339,29 @@ class Communication {
 
     this.logger.info(_scope, 'publisher validation request', logInfoData);
 
+    const publisherValidationRequestConfig = {
+      method: 'POST',
+      url: topic.publisherValidationUrl,
+      json: {
+        callback: verification.callback,
+        topic: topic.url,
+        ...(verification.httpFrom && { from: verification.httpFrom }),
+        ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }),
+      },
+      responseType: 'json',
+    };
     try {
-      response = await this.axios(publisherValidationRequestConfig);
+      response = await this.got(publisherValidationRequestConfig);
     } catch (e) {
       this.logger.error(_scope, 'publisher validation failed', { ...logInfoData, error: e });
       return false; // Do not continue with client verification.
     }
 
-    logInfoData.response = common.axiosResponseLogData(response);
+    logInfoData.response = common.gotResponseLogData(response);
     this.logger.debug(_scope, 'validation response', logInfoData);
 
     let verificationNeedsUpdate = false;
-    switch (common.httpStatusCodeClass(response.status)) {
+    switch (common.httpStatusCodeClass(response.statusCode)) {
       case 2:
         this.logger.info(_scope, 'publisher validation complete, allowed', logInfoData);
         break;
@@ -452,22 +422,32 @@ class Communication {
       return;
     }
 
-    const updateRequestConfig = Communication._topicFetchAxiosConfig(topic);
+    const updateRequestConfig = {
+      followRedirect: true,
+      method: 'GET',
+      url: topic.url,
+      headers: {
+        [Enum.Header.Accept]: [topic.contentType, `*/*${topic.contentType ? ';q=0.9' : ''}`].filter((x) => x).join(', '),
+        ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }),
+        ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }),
+      },
+      responseType: 'buffer',
+    };
 
     this.logger.info(_scope, 'topic update request', logInfoData);
     
     let response;
     try {
-      response = await this.axios(updateRequestConfig);
+      response = await this.got(updateRequestConfig);
     } catch (e) {
       this.logger.error(_scope, 'update request failed', logInfoData);
       await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
       return;
     }
-    logInfoData.response = common.axiosResponseLogData(response);
+    logInfoData.response = common.gotResponseLogData(response);
     this.logger.debug(_scope, 'fetch response', logInfoData);
 
-    switch (common.httpStatusCodeClass(response.status)) {
+    switch (common.httpStatusCodeClass(response.statusCode)) {
       case 2:
       case 3:
         // Fall out of switch on success
@@ -484,13 +464,13 @@ class Communication {
         return;
     }
 
-    if (response.status === 304) {
+    if (response.statusCode === 304) {
       this.logger.info(_scope, 'content has not changed, per server', logInfoData);
       await this.db.topicFetchComplete(dbCtx, topicId);
       return;
     }
 
-    const contentHash = Communication.contentHash(response.data, topic.contentHashAlgorithm);
+    const contentHash = Communication.contentHash(response.body, topic.contentHashAlgorithm);
     logInfoData.contentHash = contentHash;
     if (topic.contentHash === contentHash) {
       this.logger.info(_scope, 'content has not changed', logInfoData);
@@ -498,7 +478,7 @@ class Communication {
       return;
     }
 
-    const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
+    const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.body);
     if (!validHub) {
       this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
       if (this.options.communication.strictTopicHubLink) {
@@ -520,7 +500,7 @@ class Communication {
     await this.db.transaction(dbCtx, async (txCtx) => {
       await this.db.topicSetContent(txCtx, {
         topicId,
-        content: Buffer.from(response.data),
+        content: Buffer.from(response.body),
         contentHash,
         ...(contentType && { contentType }),
         ...(httpETag && { httpETag }),
@@ -587,26 +567,32 @@ class Communication {
     logInfoData.contentLength = topic.content.length;
     logInfoData.contentHash = topic.contentHash;
 
-    const updateAxiosConfig = Communication._axiosConfig('POST', subscription.callback, topic.content, {}, {
-      [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`,
-      [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain,
-      ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }),
-    });
+    const updateConfig = {
+      method: 'POST',
+      url: subscription.callback,
+      body: topic.content,
+      headers: {
+        [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`,
+        [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain,
+        ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }),
+      },
+      responseType: 'text',
+    };
 
     this.logger.info(_scope, 'update request', logInfoData);
 
     let response;
     try {
-      response = await this.axios(updateAxiosConfig);
+      response = await this.got(updateConfig);
     } catch (e) {
       this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e });
       await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
       return;
     }
-    logInfoData.response = common.axiosResponseLogData(response);
+    logInfoData.response = common.gotResponseLogData(response);
     this.logger.debug(_scope, 'update response', logInfoData);
 
-    switch (common.httpStatusCodeClass(response.status)) {
+    switch (common.httpStatusCodeClass(response.statusCode)) {
       case 2:
         // Fall out of switch on success.
         break;
@@ -617,7 +603,7 @@ class Communication {
         return;
 
       case 4:
-        if (response.status === 410) { // GONE
+        if (response.statusCode === 410) { // GONE
           this.logger.info(_scope, 'client declined further updates', logInfoData);
           await this.db.subscriptionDeliveryGone(dbCtx, subscription.callback, subscription.topicId);
           return;