* worker which initiates most of them.
*/
-const axios = require('axios');
const common = require('./common');
const crypto = require('crypto');
const Enum = require('./enum');
const LinkHelper = require('./link-helper');
const { version: packageVersion, name: packageName } = require('../package.json'); // For default UA string
-const { performance } = require('perf_hooks');
-
const _fileScope = common.fileScope(__filename);
class Communication {
this.logger.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
}
- // Set common options
- this.axios = axios.create({
- validateStatus: null, // Non-success responses are not exceptional
- headers: {
- [Enum.Header.UserAgent]: Communication.userAgentString(options.userAgent),
- },
- });
-
- this.axios.interceptors.request.use((request) => {
- request.startTimestampMs = performance.now();
- return request;
- });
- this.axios.interceptors.response.use((response) => {
- response.elapsedTimeMs = performance.now() - response.config.startTimestampMs;
- return response;
- });
+ this.Got = undefined; // Will become the async imported got.
+ this.got = this._init; // First invocation imports got and replaces this.
this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
this.worker.start();
}
+ /**
+ * Do a little dance to cope with ESM dynamic import.
+ * @param {...any} args arguments
+ * @returns {Promise<any>} got response
+ */
+ async _init(...args) {
+ if (!this.Got) {
+ // For some reason eslint is confused about import being supported here.
+
+ this.Got = await import('got');
+ this.got = this.Got.got.extend({
+ followRedirect: false, // Outgoing API calls should not encounter redirects
+ throwHttpErrors: false, // We will be checking status codes explicitly
+ headers: {
+ [Enum.Header.UserAgent]: Communication.userAgentString(this.options.userAgent),
+ },
+ timeout: {
+ request: this.options.communication.requestTimeoutMs || 120000,
+ },
+ hooks: {
+ beforeRetry: [
+ this._onRetry,
+ ],
+ },
+ });
+ }
+
+ /* istanbul ignore if */
+ if (args.length) {
+ /* istanbul ignore next */
+ return this.got(...args);
+ }
+ }
+
+
+ /**
+ * Take note of transient retries.
+ * @param {*} error error
+ * @param {*} retryCount retry count
+ */
+ _onRetry(error, retryCount) {
+ const _scope = _fileScope('_onRetry');
+ this.logger.debug(_scope, 'retry', { retryCount, error });
+ }
+
+
+ /**
+ * Construct a user-agent value.
+ * @param {object} userAgentConfig user agent config
+ * @param {string=} userAgentConfig.product product name (default package name)
+ * @param {string=} userAgentConfig.version version (default package version)
+ * @param {string=} userAgentConfig.implementation implementation (default spec supported)
+ * @returns {string} user agent string 'product/version (implementation)'
+ */
static userAgentString(userAgentConfig) {
// eslint-disable-next-line security/detect-object-injection
const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def;
- const product = _conf('product', packageName);
+ const product = _conf('product', packageName).split('/').pop();
const version = _conf('version', packageVersion);
let implementation = _conf('implementation', Enum.Specification);
if (implementation) {
}
+ /**
+ * @alias {number} Integer
+ */
/**
* Generate a random string.
- * @param {Integer} bytes
- * @returns {String}
+ * @param {Integer} bytes size of challenge
+ * @returns {Promise<string>} base64 randomness
*/
static async generateChallenge(bytes = 30) {
return (await common.randomBytesAsync(bytes)).toString('base64');
/**
* Generate the signature string for content.
- * @param {Buffer} message
- * @param {Buffer} secret
- * @param {String} algorithm
- * @returns {String}
+ * @param {Buffer} message message to sign
+ * @param {Buffer} secret secret to sign with
+ * @param {string} algorithm algorithm to sign with
+ * @returns {string} signature string
*/
static signature(message, secret, algorithm) {
- const hmac = crypto.createHmac(algorithm, secret);
- hmac.update(message);
- return `${algorithm}=${hmac.digest('hex')}`;
+ const hmac = crypto.createHmac(algorithm, secret)
+ .update(message)
+ .digest('hex');
+ return `${algorithm}=${hmac}`;
}
/**
* Generate the hash for content.
- * @param {Buffer} content
- * @param {String} algorithm
- * @returns
+ * @param {Buffer} content content
+ * @param {string} algorithm algorithm
+ * @returns {string} hash of content
*/
static contentHash(content, algorithm) {
- const hash = crypto.createHash(algorithm);
- hash.update(content);
- return hash.digest('hex');
- }
-
-
- /**
- * A request skeleton config.
- * @param {String} method
- * @param {String} requestUrl
- * @param {String} body
- * @param {Object} params
- */
- static _axiosConfig(method, requestUrl, body, params = {}, headers = {}) {
- const urlObj = new URL(requestUrl);
- const config = {
- method,
- url: `${urlObj.origin}${urlObj.pathname}`,
- params: urlObj.searchParams,
- headers,
- ...(body && { data: body }),
- // Setting this does not appear to be enough to keep axios from parsing JSON response into object
- responseType: 'text',
- // So force the matter by eliding all response transformations
- transformResponse: [ (res) => res ],
- };
- Object.entries(params).map(([k, v]) => config.params.set(k, v));
- return config;
- }
-
-
- /**
- * Create request config for verifying an intent.
- * @param {URL} requestUrl
- * @param {String} topicUrl
- * @param {String} mode
- * @param {Integer} leaseSeconds
- * @param {String} challenge
- */
- static _intentVerifyAxiosConfig(requestUrl, topicUrl, mode, leaseSeconds, challenge) {
- // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
- leaseSeconds = leaseSeconds.toString();
-
- return Communication._axiosConfig('GET', requestUrl, undefined, {
- 'hub.mode': mode,
- 'hub.topic': topicUrl,
- 'hub.challenge': challenge,
- 'hub.lease_seconds': leaseSeconds,
- }, {});
- }
-
-
- /**
- * Create request config for denying an intent.
- * @param {String} requestUrl
- * @param {String} topicUrl
- * @param {String} reason
- * @returns {String}
- */
- static _intentDenyAxiosConfig(requestUrl, topicUrl, reason) {
- return Communication._axiosConfig('GET', requestUrl, undefined, {
- 'hub.mode': Enum.Mode.Denied,
- 'hub.topic': topicUrl,
- ...(reason && { 'hub.reason': reason }),
- }, {});
- }
-
-
- /**
- * Create request config for querying publisher for subscription validation.
- * @param {Topic} topic
- * @param {Verification} verification
- * @returns {String}
- */
- static _publisherValidationAxiosConfig(topic, verification) {
- const body = {
- callback: verification.callback,
- topic: topic.url,
- ...(verification.httpFrom && { from: verification.httpFrom }),
- ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }),
- };
- return Communication._axiosConfig('POST', topic.publisherValidationUrl, body, {}, {
- [Enum.Header.ContentType]: Enum.ContentType.ApplicationJson,
- });
- }
-
-
- /**
- * Create request config for fetching topic content.
- * Prefer existing content-type, but accept anything.
- * @param {Topic} topic
- * @returns {String}
- */
- static _topicFetchAxiosConfig(topic) {
- const acceptWildcard = '*/*' + (topic.contentType ? ';q=0.9' : '');
- const acceptPreferred = [topic.contentType, acceptWildcard].filter((x) => x).join(', ');
- return Communication._axiosConfig('GET', topic.url, undefined, {}, {
- [Enum.Header.Accept]: acceptPreferred,
- });
+ return crypto.createHash(algorithm)
+ .update(content)
+ .digest('hex');
}
/**
* Attempt to verify a requested intent with client callback endpoint.
- * @param {*} dbCtx
- * @param {*} verificationId
- * @param {String} requestId
- * @returns {Boolean} whether to subsequently attempt next task if verification succeeds
+ * @param {*} dbCtx db context
+ * @param {*} verificationId verification id
+ * @param {string} requestId request id
+ * @returns {Promise<boolean>} whether to subsequently attempt next task if verification succeeds
*/
async verificationProcess(dbCtx, verificationId, requestId) {
const _scope = _fileScope('verificationProcess');
const topic = await this.db.topicGetById(dbCtx, verification.topicId);
if (!topic) {
- this.logger.error(_scope, 'no such topic id', { verification, requestId });
- throw new Errors.InternalInconsistencyError('no such topic id');
+ this.logger.error(_scope, Enum.Message.NoSuchTopicId, { verification, requestId });
+ throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId);
}
if (!topic.isActive) {
}
}
- const u = new URL(verification.callback);
- let callbackRequestConfig, challenge;
+ const callbackRequestConfig = {
+ method: 'GET',
+ url: new URL(verification.callback),
+ responseType: 'text',
+ };
+ const callbackParams = {
+ 'hub.topic': topic.url,
+ 'hub.mode': verification.mode,
+ };
+
+ let challenge;
if (verification.mode === Enum.Mode.Denied) {
- // Denials don't have a challenge.
- callbackRequestConfig = Communication._intentDenyAxiosConfig(u, topic.url, verification.reason);
+ // Denials don't have a challenge, but might have a reason.
+ if (verification.reason) {
+ callbackParams['hub.reason'] = verification.reason;
+ }
} else {
// Subscriptions and unsubscriptions require challenge matching.
challenge = await Communication.generateChallenge();
- callbackRequestConfig = Communication._intentVerifyAxiosConfig(u, topic.url, verification.mode, verification.leaseSeconds, challenge);
+ Object.assign(callbackParams, {
+ 'hub.challenge': challenge,
+ // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
+ 'hub.lease_seconds': verification.leaseSeconds.toString(),
+ });
}
+ Object.entries(callbackParams)
+ .forEach(([k, v]) => callbackRequestConfig.url.searchParams.set(k, v))
+ ;
const logInfoData = {
- callbackUrl: u.href,
+ callbackUrl: callbackRequestConfig.url.href,
topicUrl: topic.url,
mode: verification.mode,
originalRequestId: verification.requestId,
let response;
try {
- response = await this.axios(callbackRequestConfig);
+ response = await this.got(callbackRequestConfig);
} catch (e) {
this.logger.error(_scope, 'verification request failed', { ...logInfoData, error: e });
await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
return;
}
- logInfoData.response = common.axiosResponseLogData(response);
+ logInfoData.response = common.gotResponseLogData(response);
this.logger.debug(_scope, 'verification response', logInfoData );
let verificationAccepted = true; // Presume success.
- switch (common.httpStatusCodeClass(response.status)) {
+ switch (common.httpStatusCodeClass(response.statusCode)) {
case 2:
// Success, fall out of switch.
break;
}
if ([Enum.Mode.Subscribe, Enum.Mode.Unsubscribe].includes(verification.mode)
- && response.data !== challenge) {
+ && response.body !== challenge) {
this.logger.info(_scope, 'verification rejected by challenge', logInfoData);
verificationAccepted = false;
}
case Enum.Mode.Denied:
await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
if (topic.isDeleted) {
- // Remove a deleted topic after he last subscription is notified.
+ // Remove a deleted topic after the last subscription is notified.
await this.db.topicPendingDelete(txCtx, topic.id);
}
break;
}
+ /**
+ * @alias {object} TopicData
+ * @alias {object} VerificationData
+ */
/**
* Attempt to verify a pending subscription request with publisher.
* Updates (and persists) verification.
* Returns boolean of status of publisher contact, and hence
* whether to continue verification with client.
- * @param {*} dbCtx
- * @param {TopicData} topic
- * @param {VerificationData} verification
- * @returns {Boolean}
+ *
+ * This is not defined by the spec. We opt to speak JSON here.
+ * @param {*} dbCtx db context
+ * @param {TopicData} topic topic
+ * @param {VerificationData} verification verification
+ * @returns {Promise<boolean>} true if successful contact with publisher
*/
async publisherValidate(dbCtx, topic, verification) {
const _scope = _fileScope('publisherValidate');
- const publisherValidationRequestConfig = Communication._publisherValidationAxiosConfig(topic, verification);
const logInfoData = {
topicUrl: topic.url,
callbackUrl: verification.callback,
this.logger.info(_scope, 'publisher validation request', logInfoData);
+ const publisherValidationRequestConfig = {
+ method: 'POST',
+ url: topic.publisherValidationUrl,
+ json: {
+ callback: verification.callback,
+ topic: topic.url,
+ ...(verification.httpFrom && { from: verification.httpFrom }),
+ ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }),
+ },
+ responseType: 'json',
+ };
try {
- response = await this.axios(publisherValidationRequestConfig);
+ response = await this.got(publisherValidationRequestConfig);
} catch (e) {
this.logger.error(_scope, 'publisher validation failed', { ...logInfoData, error: e });
return false; // Do not continue with client verification.
}
- logInfoData.response = common.axiosResponseLogData(response);
+ logInfoData.response = common.gotResponseLogData(response);
this.logger.debug(_scope, 'validation response', logInfoData);
let verificationNeedsUpdate = false;
- switch (common.httpStatusCodeClass(response.status)) {
+ switch (common.httpStatusCodeClass(response.statusCode)) {
case 2:
this.logger.info(_scope, 'publisher validation complete, allowed', logInfoData);
break;
/**
* Retrieve content from a topic.
- * @param {*} dbCtx
- * @param {*} topicId
- * @param {String} requestId
- * @returns
+ * @param {*} dbCtx db context
+ * @param {*} topicId topic id
+ * @param {string} requestId request id
+ * @returns {Promise<void>}
*/
async topicFetchProcess(dbCtx, topicId, requestId) {
const _scope = _fileScope('topicFetchProcess');
const topic = await this.db.topicGetById(dbCtx, topicId);
if (topic === undefined) {
- this.logger.error(_scope, 'no such topic id', logInfoData);
- throw new Errors.InternalInconsistencyError('no such topic id');
+ this.logger.error(_scope, Enum.Message.NoSuchTopicId, logInfoData);
+ throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId);
}
// Cull any expired subscriptions
await this.db.subscriptionDeleteExpired(dbCtx, topicId);
- logInfoData.url = topicId.url;
+ logInfoData.url = topic.url;
if (topic.isDeleted) {
this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData);
return;
}
- const updateRequestConfig = Communication._topicFetchAxiosConfig(topic);
+ const updateRequestConfig = {
+ followRedirect: true,
+ method: 'GET',
+ url: topic.url,
+ headers: {
+ [Enum.Header.Accept]: [topic.contentType, `*/*${topic.contentType ? ';q=0.9' : ''}`].filter((x) => x).join(', '),
+ ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }),
+ ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }),
+ },
+ responseType: 'buffer',
+ };
this.logger.info(_scope, 'topic update request', logInfoData);
let response;
try {
- response = await this.axios(updateRequestConfig);
+ response = await this.got(updateRequestConfig);
} catch (e) {
- this.logger.error(_scope, 'update request failed', logInfoData);
+ this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e });
await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
return;
}
- logInfoData.response = common.axiosResponseLogData(response);
+ logInfoData.response = common.gotResponseLogData(response);
this.logger.debug(_scope, 'fetch response', logInfoData);
- switch (common.httpStatusCodeClass(response.status)) {
+ switch (common.httpStatusCodeClass(response.statusCode)) {
case 2:
+ case 3:
// Fall out of switch on success
break;
return;
}
- const contentHash = Communication.contentHash(response.data, topic.contentHashAlgorithm);
+ if (response.statusCode === 304) {
+ this.logger.info(_scope, 'content has not changed, per server', logInfoData);
+ await this.db.topicFetchComplete(dbCtx, topicId);
+ return;
+ }
+
+ const contentHash = Communication.contentHash(response.body, topic.contentHashAlgorithm);
logInfoData.contentHash = contentHash;
if (topic.contentHash === contentHash) {
this.logger.info(_scope, 'content has not changed', logInfoData);
return;
}
- const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
+ const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.body);
if (!validHub) {
this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
if (this.options.communication.strictTopicHubLink) {
}
const contentType = response.headers[Enum.Header.ContentType.toLowerCase()];
+ const httpETag = response.headers[Enum.Header.ETag.toLowerCase()];
+ const httpLastModified = response.headers[Enum.Header.LastModified.toLowerCase()];
await this.db.transaction(dbCtx, async (txCtx) => {
await this.db.topicSetContent(txCtx, {
topicId,
- content: Buffer.from(response.data),
+ content: Buffer.from(response.body),
contentHash,
...(contentType && { contentType }),
+ ...(httpETag && { httpETag }),
+ ...(httpLastModified && { httpLastModified }),
});
await this.db.topicFetchComplete(txCtx, topicId);
/**
* Attempt to deliver a topic's content to a subscription.
- * @param {*} dbCtx
- * @param {String} callback
- * @param {*} topicId
- * @param {String} requestId
+ * @param {*} dbCtx db context
+ * @param {string} subscriptionId subscription id
+ * @param {string} requestId request id
+ * @returns {Promise<void>}
*/
async subscriptionDeliveryProcess(dbCtx, subscriptionId, requestId) {
const _scope = _fileScope('subscriptionDeliveryProcess');
await this.db.transaction(dbCtx, async (txCtx) => {
await this.db.verificationInsert(txCtx, verification);
- await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId);
+ await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
});
this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
return;
logInfoData.contentLength = topic.content.length;
logInfoData.contentHash = topic.contentHash;
- const updateAxiosConfig = Communication._axiosConfig('POST', subscription.callback, topic.content, {}, {
- [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`,
- [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain,
- ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }),
- });
+ const updateConfig = {
+ method: 'POST',
+ url: subscription.callback,
+ body: topic.content,
+ headers: {
+ [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`,
+ [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain,
+ ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }),
+ },
+ responseType: 'text',
+ };
this.logger.info(_scope, 'update request', logInfoData);
let response;
try {
- response = await this.axios(updateAxiosConfig);
+ response = await this.got(updateConfig);
} catch (e) {
this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e });
await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
return;
}
- logInfoData.response = common.axiosResponseLogData(response);
+ logInfoData.response = common.gotResponseLogData(response);
this.logger.debug(_scope, 'update response', logInfoData);
- switch (common.httpStatusCodeClass(response.status)) {
+ switch (common.httpStatusCodeClass(response.statusCode)) {
case 2:
// Fall out of switch on success.
break;
return;
case 4:
- if (response.status === 410) { // GONE
+ if (response.statusCode === 410) { // GONE
this.logger.info(_scope, 'client declined further updates', logInfoData);
await this.db.subscriptionDeliveryGone(dbCtx, subscription.callback, subscription.topicId);
return;
return;
}
- await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId);
+ await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
this.logger.info(_scope, 'update success', logInfoData);
}
/**
* Claim and work a specific topic fetch task.
- * @param {*} dbCtx
- * @param {*} id
- * @param {String} requestId
+ * @param {*} dbCtx db context
+ * @param {string} topicId topic id
+ * @param {string} requestId request id
+ * @returns {Promise<void>}
*/
async topicFetchClaimAndProcessById(dbCtx, topicId, requestId) {
const _scope = _fileScope('topicFetchClaimAndProcessById');
/**
* Claim and work a specific verification confirmation task.
- * @param {*} dbCtx
- * @param {*} verificationId
- * @param {String} requestId
- * @returns
+ * @param {*} dbCtx db context
+ * @param {*} verificationId verification id
+ * @param {string} requestId request id
+ * @returns {Promise<boolean>} whether to subsequently attempt next task if verification succeeds
*/
async verificationClaimAndProcessById(dbCtx, verificationId, requestId) {
const _scope = _fileScope('verificationClaimAndProcessById');
/**
*
- * @param {*} dbCtx
- * @param {Number} wanted maximum tasks to claim
- * @returns {Promise<void>[]}
+ * @param {*} dbCtx db context
+ * @param {number} wanted maximum tasks to claim
+ * @returns {Promise<void>[]} array of promises processing work
*/
async workFeed(dbCtx, wanted) {
const _scope = _fileScope('workFeed');