X-Git-Url: http://git.squeep.com/?a=blobdiff_plain;f=src%2Fcommunication.js;h=960a0f994b27a572236a92131638b326c59f4ece;hb=HEAD;hp=3b436786fabf3ebfddfb245b479da669894f8377;hpb=71efac9dcd7dc219cb83799391e7adc63cd4c662;p=websub-hub diff --git a/src/communication.js b/src/communication.js index 3b43678..9a4da3a 100644 --- a/src/communication.js +++ b/src/communication.js @@ -5,7 +5,6 @@ * worker which initiates most of them. */ -const axios = require('axios'); const common = require('./common'); const crypto = require('crypto'); const Enum = require('./enum'); @@ -14,8 +13,6 @@ const Worker = require('./worker'); const LinkHelper = require('./link-helper'); const { version: packageVersion, name: packageName } = require('../package.json'); // For default UA string -const { performance } = require('perf_hooks'); - const _fileScope = common.fileScope(__filename); class Communication { @@ -32,28 +29,68 @@ class Communication { this.logger.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant'); } - // Set common options - this.axios = axios.create({ - validateStatus: null, // Non-success responses are not exceptional - headers: { - [Enum.Header.UserAgent]: Communication.userAgentString(options.userAgent), - }, - }); - - this.axios.interceptors.request.use((request) => { - request.startTimestampMs = performance.now(); - return request; - }); - this.axios.interceptors.response.use((response) => { - response.elapsedTimeMs = performance.now() - response.config.startTimestampMs; - return response; - }); + this.Got = undefined; // Will become the async imported got. + this.got = this._init; // First invocation imports got and replaces this. this.worker = new Worker(logger, db, this.workFeed.bind(this), options); this.worker.start(); } + /** + * Do a little dance to cope with ESM dynamic import. + * @param {...any} args arguments + * @returns {Promise} got response + */ + async _init(...args) { + if (!this.Got) { + // For some reason eslint is confused about import being supported here. + + this.Got = await import('got'); + this.got = this.Got.got.extend({ + followRedirect: false, // Outgoing API calls should not encounter redirects + throwHttpErrors: false, // We will be checking status codes explicitly + headers: { + [Enum.Header.UserAgent]: Communication.userAgentString(this.options.userAgent), + }, + timeout: { + request: this.options.communication.requestTimeoutMs || 120000, + }, + hooks: { + beforeRetry: [ + this._onRetry, + ], + }, + }); + } + + /* istanbul ignore if */ + if (args.length) { + /* istanbul ignore next */ + return this.got(...args); + } + } + + + /** + * Take note of transient retries. + * @param {*} error error + * @param {*} retryCount retry count + */ + _onRetry(error, retryCount) { + const _scope = _fileScope('_onRetry'); + this.logger.debug(_scope, 'retry', { retryCount, error }); + } + + + /** + * Construct a user-agent value. + * @param {object} userAgentConfig user agent config + * @param {string=} userAgentConfig.product product name (default package name) + * @param {string=} userAgentConfig.version version (default package version) + * @param {string=} userAgentConfig.implementation implementation (default spec supported) + * @returns {string} user agent string 'product/version (implementation)' + */ static userAgentString(userAgentConfig) { // eslint-disable-next-line security/detect-object-injection const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def; @@ -67,10 +104,13 @@ class Communication { } + /** + * @alias {number} Integer + */ /** * Generate a random string. - * @param {Integer} bytes - * @returns {String} + * @param {Integer} bytes size of challenge + * @returns {Promise} base64 randomness */ static async generateChallenge(bytes = 30) { return (await common.randomBytesAsync(bytes)).toString('base64'); @@ -79,135 +119,38 @@ class Communication { /** * Generate the signature string for content. - * @param {Buffer} message - * @param {Buffer} secret - * @param {String} algorithm - * @returns {String} + * @param {Buffer} message message to sign + * @param {Buffer} secret secret to sign with + * @param {string} algorithm algorithm to sign with + * @returns {string} signature string */ static signature(message, secret, algorithm) { - const hmac = crypto.createHmac(algorithm, secret); - hmac.update(message); - return `${algorithm}=${hmac.digest('hex')}`; + const hmac = crypto.createHmac(algorithm, secret) + .update(message) + .digest('hex'); + return `${algorithm}=${hmac}`; } /** * Generate the hash for content. - * @param {Buffer} content - * @param {String} algorithm - * @returns + * @param {Buffer} content content + * @param {string} algorithm algorithm + * @returns {string} hash of content */ static contentHash(content, algorithm) { - const hash = crypto.createHash(algorithm); - hash.update(content); - return hash.digest('hex'); - } - - - /** - * A request skeleton config. - * @param {String} method - * @param {String} requestUrl - * @param {String} body - * @param {Object} params - */ - static _axiosConfig(method, requestUrl, body, params = {}, headers = {}) { - const urlObj = new URL(requestUrl); - const config = { - method, - url: `${urlObj.origin}${urlObj.pathname}`, - params: urlObj.searchParams, - headers, - ...(body && { data: body }), - // Setting this does not appear to be enough to keep axios from parsing JSON response into object - responseType: 'text', - // So force the matter by eliding all response transformations - transformResponse: [ (res) => res ], - }; - Object.entries(params).map(([k, v]) => config.params.set(k, v)); - return config; - } - - - /** - * Create request config for verifying an intent. - * @param {URL} requestUrl - * @param {String} topicUrl - * @param {String} mode - * @param {Integer} leaseSeconds - * @param {String} challenge - */ - static _intentVerifyAxiosConfig(requestUrl, topicUrl, mode, leaseSeconds, challenge) { - // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..) - leaseSeconds = leaseSeconds.toString(); - - return Communication._axiosConfig('GET', requestUrl, undefined, { - 'hub.mode': mode, - 'hub.topic': topicUrl, - 'hub.challenge': challenge, - 'hub.lease_seconds': leaseSeconds, - }, {}); - } - - - /** - * Create request config for denying an intent. - * @param {String} requestUrl - * @param {String} topicUrl - * @param {String} reason - * @returns {String} - */ - static _intentDenyAxiosConfig(requestUrl, topicUrl, reason) { - return Communication._axiosConfig('GET', requestUrl, undefined, { - 'hub.mode': Enum.Mode.Denied, - 'hub.topic': topicUrl, - ...(reason && { 'hub.reason': reason }), - }, {}); - } - - - /** - * Create request config for querying publisher for subscription validation. - * @param {Topic} topic - * @param {Verification} verification - * @returns {String} - */ - static _publisherValidationAxiosConfig(topic, verification) { - const body = { - callback: verification.callback, - topic: topic.url, - ...(verification.httpFrom && { from: verification.httpFrom }), - ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }), - }; - return Communication._axiosConfig('POST', topic.publisherValidationUrl, body, {}, { - [Enum.Header.ContentType]: Enum.ContentType.ApplicationJson, - }); - } - - - /** - * Create request config for fetching topic content. - * Prefer existing content-type, but accept anything. - * @param {Topic} topic - * @returns {String} - */ - static _topicFetchAxiosConfig(topic) { - const acceptWildcard = '*/*' + (topic.contentType ? ';q=0.9' : ''); - const acceptPreferred = [topic.contentType, acceptWildcard].filter((x) => x).join(', '); - return Communication._axiosConfig('GET', topic.url, undefined, {}, { - [Enum.Header.Accept]: acceptPreferred, - ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }), - ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }), - }); + return crypto.createHash(algorithm) + .update(content) + .digest('hex'); } /** * Attempt to verify a requested intent with client callback endpoint. - * @param {*} dbCtx - * @param {*} verificationId - * @param {String} requestId - * @returns {Boolean} whether to subsequently attempt next task if verification succeeds + * @param {*} dbCtx db context + * @param {*} verificationId verification id + * @param {string} requestId request id + * @returns {Promise} whether to subsequently attempt next task if verification succeeds */ async verificationProcess(dbCtx, verificationId, requestId) { const _scope = _fileScope('verificationProcess'); @@ -220,8 +163,8 @@ class Communication { const topic = await this.db.topicGetById(dbCtx, verification.topicId); if (!topic) { - this.logger.error(_scope, 'no such topic id', { verification, requestId }); - throw new Errors.InternalInconsistencyError('no such topic id'); + this.logger.error(_scope, Enum.Message.NoSuchTopicId, { verification, requestId }); + throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId); } if (!topic.isActive) { @@ -256,19 +199,37 @@ class Communication { } } - const u = new URL(verification.callback); - let callbackRequestConfig, challenge; + const callbackRequestConfig = { + method: 'GET', + url: new URL(verification.callback), + responseType: 'text', + }; + const callbackParams = { + 'hub.topic': topic.url, + 'hub.mode': verification.mode, + }; + + let challenge; if (verification.mode === Enum.Mode.Denied) { - // Denials don't have a challenge. - callbackRequestConfig = Communication._intentDenyAxiosConfig(u, topic.url, verification.reason); + // Denials don't have a challenge, but might have a reason. + if (verification.reason) { + callbackParams['hub.reason'] = verification.reason; + } } else { // Subscriptions and unsubscriptions require challenge matching. challenge = await Communication.generateChallenge(); - callbackRequestConfig = Communication._intentVerifyAxiosConfig(u, topic.url, verification.mode, verification.leaseSeconds, challenge); + Object.assign(callbackParams, { + 'hub.challenge': challenge, + // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..) + 'hub.lease_seconds': verification.leaseSeconds.toString(), + }); } + Object.entries(callbackParams) + .forEach(([k, v]) => callbackRequestConfig.url.searchParams.set(k, v)) + ; const logInfoData = { - callbackUrl: u.href, + callbackUrl: callbackRequestConfig.url.href, topicUrl: topic.url, mode: verification.mode, originalRequestId: verification.requestId, @@ -280,18 +241,18 @@ class Communication { let response; try { - response = await this.axios(callbackRequestConfig); + response = await this.got(callbackRequestConfig); } catch (e) { this.logger.error(_scope, 'verification request failed', { ...logInfoData, error: e }); await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds); return; } - logInfoData.response = common.axiosResponseLogData(response); + logInfoData.response = common.gotResponseLogData(response); this.logger.debug(_scope, 'verification response', logInfoData ); let verificationAccepted = true; // Presume success. - switch (common.httpStatusCodeClass(response.status)) { + switch (common.httpStatusCodeClass(response.statusCode)) { case 2: // Success, fall out of switch. break; @@ -315,7 +276,7 @@ class Communication { } if ([Enum.Mode.Subscribe, Enum.Mode.Unsubscribe].includes(verification.mode) - && response.data !== challenge) { + && response.body !== challenge) { this.logger.info(_scope, 'verification rejected by challenge', logInfoData); verificationAccepted = false; } @@ -341,7 +302,7 @@ class Communication { case Enum.Mode.Denied: await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId); if (topic.isDeleted) { - // Remove a deleted topic after he last subscription is notified. + // Remove a deleted topic after the last subscription is notified. await this.db.topicPendingDelete(txCtx, topic.id); } break; @@ -358,19 +319,24 @@ class Communication { } + /** + * @alias {object} TopicData + * @alias {object} VerificationData + */ /** * Attempt to verify a pending subscription request with publisher. * Updates (and persists) verification. * Returns boolean of status of publisher contact, and hence * whether to continue verification with client. - * @param {*} dbCtx - * @param {TopicData} topic - * @param {VerificationData} verification - * @returns {Boolean} + * + * This is not defined by the spec. We opt to speak JSON here. + * @param {*} dbCtx db context + * @param {TopicData} topic topic + * @param {VerificationData} verification verification + * @returns {Promise} true if successful contact with publisher */ async publisherValidate(dbCtx, topic, verification) { const _scope = _fileScope('publisherValidate'); - const publisherValidationRequestConfig = Communication._publisherValidationAxiosConfig(topic, verification); const logInfoData = { topicUrl: topic.url, callbackUrl: verification.callback, @@ -380,18 +346,29 @@ class Communication { this.logger.info(_scope, 'publisher validation request', logInfoData); + const publisherValidationRequestConfig = { + method: 'POST', + url: topic.publisherValidationUrl, + json: { + callback: verification.callback, + topic: topic.url, + ...(verification.httpFrom && { from: verification.httpFrom }), + ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }), + }, + responseType: 'json', + }; try { - response = await this.axios(publisherValidationRequestConfig); + response = await this.got(publisherValidationRequestConfig); } catch (e) { this.logger.error(_scope, 'publisher validation failed', { ...logInfoData, error: e }); return false; // Do not continue with client verification. } - logInfoData.response = common.axiosResponseLogData(response); + logInfoData.response = common.gotResponseLogData(response); this.logger.debug(_scope, 'validation response', logInfoData); let verificationNeedsUpdate = false; - switch (common.httpStatusCodeClass(response.status)) { + switch (common.httpStatusCodeClass(response.statusCode)) { case 2: this.logger.info(_scope, 'publisher validation complete, allowed', logInfoData); break; @@ -422,10 +399,10 @@ class Communication { /** * Retrieve content from a topic. - * @param {*} dbCtx - * @param {*} topicId - * @param {String} requestId - * @returns + * @param {*} dbCtx db context + * @param {*} topicId topic id + * @param {string} requestId request id + * @returns {Promise} */ async topicFetchProcess(dbCtx, topicId, requestId) { const _scope = _fileScope('topicFetchProcess'); @@ -438,8 +415,8 @@ class Communication { const topic = await this.db.topicGetById(dbCtx, topicId); if (topic === undefined) { - this.logger.error(_scope, 'no such topic id', logInfoData); - throw new Errors.InternalInconsistencyError('no such topic id'); + this.logger.error(_scope, Enum.Message.NoSuchTopicId, logInfoData); + throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId); } // Cull any expired subscriptions @@ -452,22 +429,32 @@ class Communication { return; } - const updateRequestConfig = Communication._topicFetchAxiosConfig(topic); + const updateRequestConfig = { + followRedirect: true, + method: 'GET', + url: topic.url, + headers: { + [Enum.Header.Accept]: [topic.contentType, `*/*${topic.contentType ? ';q=0.9' : ''}`].filter((x) => x).join(', '), + ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }), + ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }), + }, + responseType: 'buffer', + }; this.logger.info(_scope, 'topic update request', logInfoData); let response; try { - response = await this.axios(updateRequestConfig); + response = await this.got(updateRequestConfig); } catch (e) { - this.logger.error(_scope, 'update request failed', logInfoData); + this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e }); await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds); return; } - logInfoData.response = common.axiosResponseLogData(response); + logInfoData.response = common.gotResponseLogData(response); this.logger.debug(_scope, 'fetch response', logInfoData); - switch (common.httpStatusCodeClass(response.status)) { + switch (common.httpStatusCodeClass(response.statusCode)) { case 2: case 3: // Fall out of switch on success @@ -484,13 +471,13 @@ class Communication { return; } - if (response.status === 304) { + if (response.statusCode === 304) { this.logger.info(_scope, 'content has not changed, per server', logInfoData); await this.db.topicFetchComplete(dbCtx, topicId); return; } - const contentHash = Communication.contentHash(response.data, topic.contentHashAlgorithm); + const contentHash = Communication.contentHash(response.body, topic.contentHashAlgorithm); logInfoData.contentHash = contentHash; if (topic.contentHash === contentHash) { this.logger.info(_scope, 'content has not changed', logInfoData); @@ -498,7 +485,7 @@ class Communication { return; } - const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data); + const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.body); if (!validHub) { this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData }); if (this.options.communication.strictTopicHubLink) { @@ -520,7 +507,7 @@ class Communication { await this.db.transaction(dbCtx, async (txCtx) => { await this.db.topicSetContent(txCtx, { topicId, - content: Buffer.from(response.data), + content: Buffer.from(response.body), contentHash, ...(contentType && { contentType }), ...(httpETag && { httpETag }), @@ -535,10 +522,10 @@ class Communication { /** * Attempt to deliver a topic's content to a subscription. - * @param {*} dbCtx - * @param {String} callback - * @param {*} topicId - * @param {String} requestId + * @param {*} dbCtx db context + * @param {string} subscriptionId subscription id + * @param {string} requestId request id + * @returns {Promise} */ async subscriptionDeliveryProcess(dbCtx, subscriptionId, requestId) { const _scope = _fileScope('subscriptionDeliveryProcess'); @@ -587,26 +574,32 @@ class Communication { logInfoData.contentLength = topic.content.length; logInfoData.contentHash = topic.contentHash; - const updateAxiosConfig = Communication._axiosConfig('POST', subscription.callback, topic.content, {}, { - [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`, - [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain, - ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }), - }); + const updateConfig = { + method: 'POST', + url: subscription.callback, + body: topic.content, + headers: { + [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`, + [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain, + ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }), + }, + responseType: 'text', + }; this.logger.info(_scope, 'update request', logInfoData); let response; try { - response = await this.axios(updateAxiosConfig); + response = await this.got(updateConfig); } catch (e) { this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e }); await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds); return; } - logInfoData.response = common.axiosResponseLogData(response); + logInfoData.response = common.gotResponseLogData(response); this.logger.debug(_scope, 'update response', logInfoData); - switch (common.httpStatusCodeClass(response.status)) { + switch (common.httpStatusCodeClass(response.statusCode)) { case 2: // Fall out of switch on success. break; @@ -617,7 +610,7 @@ class Communication { return; case 4: - if (response.status === 410) { // GONE + if (response.statusCode === 410) { // GONE this.logger.info(_scope, 'client declined further updates', logInfoData); await this.db.subscriptionDeliveryGone(dbCtx, subscription.callback, subscription.topicId); return; @@ -637,9 +630,10 @@ class Communication { /** * Claim and work a specific topic fetch task. - * @param {*} dbCtx - * @param {*} id - * @param {String} requestId + * @param {*} dbCtx db context + * @param {string} topicId topic id + * @param {string} requestId request id + * @returns {Promise} */ async topicFetchClaimAndProcessById(dbCtx, topicId, requestId) { const _scope = _fileScope('topicFetchClaimAndProcessById'); @@ -655,10 +649,10 @@ class Communication { /** * Claim and work a specific verification confirmation task. - * @param {*} dbCtx - * @param {*} verificationId - * @param {String} requestId - * @returns + * @param {*} dbCtx db context + * @param {*} verificationId verification id + * @param {string} requestId request id + * @returns {Promise} whether to subsequently attempt next task if verification succeeds */ async verificationClaimAndProcessById(dbCtx, verificationId, requestId) { const _scope = _fileScope('verificationClaimAndProcessById'); @@ -674,9 +668,9 @@ class Communication { /** * - * @param {*} dbCtx - * @param {Number} wanted maximum tasks to claim - * @returns {Promise[]} + * @param {*} dbCtx db context + * @param {number} wanted maximum tasks to claim + * @returns {Promise[]} array of promises processing work */ async workFeed(dbCtx, wanted) { const _scope = _fileScope('workFeed');