4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
8 const axios
= require('axios');
9 const common
= require('./common');
10 const crypto
= require('crypto');
11 const Enum
= require('./enum');
12 const Errors
= require('./errors');
13 const Worker
= require('./worker');
14 const LinkHelper
= require('./link-helper');
15 const { version: packageVersion
, name: packageName
} = require('../package.json'); // For default UA string
17 const { performance
} = require('perf_hooks');
19 const _fileScope
= common
.fileScope(__filename
);
22 constructor(logger
, db
, options
) {
25 this.options
= options
;
26 this.linkHelper
= new LinkHelper(logger
, options
);
28 if (this.options
.dingus
.selfBaseUrl
) {
29 this.linkHub
= `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
32 this.logger
.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
36 this.axios
= axios
.create({
37 validateStatus: null, // Non-success responses are not exceptional
39 [Enum
.Header
.UserAgent
]: Communication
.userAgentString(options
.userAgent
),
43 this.axios
.interceptors
.request
.use((request
) => {
44 request
.startTimestampMs
= performance
.now();
47 this.axios
.interceptors
.response
.use((response
) => {
48 response
.elapsedTimeMs
= performance
.now() - response
.config
.startTimestampMs
;
52 this.worker
= new Worker(logger
, this.workFeed
.bind(this), options
);
57 static userAgentString(userAgentConfig
) {
58 // eslint-disable-next-line security/detect-object-injection
59 const _conf
= (field
, def
) => (userAgentConfig
&& field
in userAgentConfig
) ? userAgentConfig
[field
] : def
;
60 const product
= _conf('product', packageName
);
61 const version
= _conf('version', packageVersion
);
62 let implementation
= _conf('implementation', Enum
.Specification
);
64 implementation
= ` (${implementation})`;
66 return `${product}/${version}${implementation}`;
71 * Generate a random string.
72 * @param {Integer} bytes
75 static async
generateChallenge(bytes
= 30) {
76 return (await common
.randomBytesAsync(bytes
)).toString('base64');
81 * Generate the signature string for content.
82 * @param {Buffer} message
83 * @param {Buffer} secret
84 * @param {String} algorithm
87 static signature(message
, secret
, algorithm
) {
88 const hmac
= crypto
.createHmac(algorithm
, secret
);
90 return `${algorithm}=${hmac.digest('hex')}`;
95 * Generate the hash for content.
96 * @param {Buffer} content
97 * @param {String} algorithm
100 static contentHash(content
, algorithm
) {
101 const hash
= crypto
.createHash(algorithm
);
102 hash
.update(content
);
103 return hash
.digest('hex');
108 * A request skeleton config.
109 * @param {String} method
110 * @param {String} requestUrl
111 * @param {String} body
112 * @param {Object} params
114 static _axiosConfig(method
, requestUrl
, body
, params
= {}, headers
= {}) {
115 const urlObj
= new URL(requestUrl
);
118 url: `${urlObj.origin}${urlObj.pathname}`,
119 params: urlObj
.searchParams
,
121 ...(body
&& { data: body
}),
122 // Setting this does not appear to be enough to keep axios from parsing JSON response into object
123 responseType: 'text',
124 // So force the matter by eliding all response transformations
125 transformResponse: [ (res
) => res
],
127 Object
.entries(params
).map(([k
, v
]) => config
.params
.set(k
, v
));
133 * Create request config for verifying an intent.
134 * @param {URL} requestUrl
135 * @param {String} topicUrl
136 * @param {String} mode
137 * @param {Integer} leaseSeconds
138 * @param {String} challenge
140 static _intentVerifyAxiosConfig(requestUrl
, topicUrl
, mode
, leaseSeconds
, challenge
) {
141 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
142 leaseSeconds
= leaseSeconds
.toString();
144 return Communication
._axiosConfig('GET', requestUrl
, undefined, {
146 'hub.topic': topicUrl
,
147 'hub.challenge': challenge
,
148 'hub.lease_seconds': leaseSeconds
,
154 * Create request config for denying an intent.
155 * @param {String} requestUrl
156 * @param {String} topicUrl
157 * @param {String} reason
160 static _intentDenyAxiosConfig(requestUrl
, topicUrl
, reason
) {
161 return Communication
._axiosConfig('GET', requestUrl
, undefined, {
162 'hub.mode': Enum
.Mode
.Denied
,
163 'hub.topic': topicUrl
,
164 ...(reason
&& { 'hub.reason': reason
}),
170 * Create request config for querying publisher for subscription validation.
171 * @param {Topic} topic
172 * @param {Verification} verification
175 static _publisherValidationAxiosConfig(topic
, verification
) {
177 callback: verification
.callback
,
179 ...(verification
.httpFrom
&& { from: verification
.httpFrom
}),
180 ...(verification
.httpRemoteAddr
&& { address: verification
.httpRemoteAddr
}),
182 return Communication
._axiosConfig('POST', topic
.publisherValidationUrl
, body
, {}, {
183 [Enum
.Header
.ContentType
]: Enum
.ContentType
.ApplicationJson
,
189 * Create request config for fetching topic content.
190 * Prefer existing content-type, but accept anything.
191 * @param {Topic} topic
194 static _topicFetchAxiosConfig(topic
) {
195 const acceptWildcard
= '*/*' + (topic
.contentType
? ';q=0.9' : '');
196 const acceptPreferred
= [topic
.contentType
, acceptWildcard
].filter((x
) => x
).join(', ');
197 return Communication
._axiosConfig('GET', topic
.url
, undefined, {}, {
198 [Enum
.Header
.Accept
]: acceptPreferred
,
204 * Attempt to verify a requested intent with client callback endpoint.
206 * @param {*} verificationId
207 * @param {String} requestId
208 * @returns {Boolean} whether to subsequently attempt next task if verification succeeds
210 async
verificationProcess(dbCtx
, verificationId
, requestId
) {
211 const _scope
= _fileScope('verificationProcess');
213 const verification
= await
this.db
.verificationGetById(dbCtx
, verificationId
);
215 this.logger
.error(_scope
, 'no such verification', { verificationId
, requestId
});
216 throw new Errors
.InternalInconsistencyError('no such verification id');
219 const topic
= await
this.db
.topicGetById(dbCtx
, verification
.topicId
);
221 this.logger
.error(_scope
, 'no such topic id', { verification
, requestId
});
222 throw new Errors
.InternalInconsistencyError('no such topic id');
225 if (!topic
.isActive
) {
226 this.logger
.debug(_scope
, 'topic not active, skipping verification', { verification
, requestId
});
227 await
this.db
.verificationRelease(dbCtx
, verificationId
);
231 // If topic is deleted, deny any subscriptions.
232 // Un-subscriptions can continue to be verified.
233 if (topic
.isDeleted
&& verification
.mode
=== Enum
.Mode
.Subscribe
) {
234 this.logger
.info(_scope
, 'topic is deleted, verification becomes denial', { verification
, requestId
});
236 verification
.mode
= Enum
.Mode
.Denied
;
237 verification
.reason
= 'Gone: topic no longer valid on this hub.';
238 verification
.isPublisherValidated
= true;
239 await
this.db
.verificationUpdate(dbCtx
, verification
);
242 // If verification needs publisher validation, this delivery is for publisher.
243 if (verification
.mode
=== Enum
.Mode
.Subscribe
&& verification
.isPublisherValidated
=== false) {
244 this.logger
.debug(_scope
, 'attempting publisher validation', { verification
, requestId
});
245 const continueVerification
= await
this.publisherValidate(dbCtx
, topic
, verification
);
247 // If publisher validation completed, verification will proceed.
248 // If not, we're done for now and shall try again later.
249 if (!continueVerification
) {
250 this.logger
.debug(_scope
, 'publisher validation did not complete, belaying verification', { verification
});
251 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
256 const u
= new URL(verification
.callback
);
257 let callbackRequestConfig
, challenge
;
258 if (verification
.mode
=== Enum
.Mode
.Denied
) {
259 // Denials don't have a challenge.
260 callbackRequestConfig
= Communication
._intentDenyAxiosConfig(u
, topic
.url
, verification
.reason
);
262 // Subscriptions and unsubscriptions require challenge matching.
263 challenge
= await Communication
.generateChallenge();
264 callbackRequestConfig
= Communication
._intentVerifyAxiosConfig(u
, topic
.url
, verification
.mode
, verification
.leaseSeconds
, challenge
);
267 const logInfoData
= {
270 mode: verification
.mode
,
271 originalRequestId: verification
.requestId
,
276 this.logger
.info(_scope
, 'verification request', logInfoData
);
280 response
= await
this.axios(callbackRequestConfig
);
282 this.logger
.error(_scope
, 'verification request failed', { ...logInfoData
, error: e
});
283 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
286 logInfoData
.response
= common
.axiosResponseLogData(response
);
287 this.logger
.debug(_scope
, 'verification response', logInfoData
);
289 let verificationAccepted
= true; // Presume success.
291 switch (common
.httpStatusCodeClass(response
.status
)) {
293 // Success, fall out of switch.
298 this.logger
.info(_scope
, 'verification remote server error', logInfoData
);
299 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
303 // Anything else is unsuccessful.
304 this.logger
.info(_scope
, 'verification rejected by status', logInfoData
);
305 verificationAccepted
= false;
308 // Any denial is not accepted.
309 if (verification
.mode
=== Enum
.Mode
.Denied
) {
310 this.logger
.info(_scope
, 'verification denial accepted', logInfoData
);
311 verificationAccepted
= false;
314 if ([Enum
.Mode
.Subscribe
, Enum
.Mode
.Unsubscribe
].includes(verification
.mode
)
315 && response
.data
!== challenge
) {
316 this.logger
.info(_scope
, 'verification rejected by challenge', logInfoData
);
317 verificationAccepted
= false;
320 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
321 switch (verification
.mode
) {
322 case Enum
.Mode
.Subscribe:
323 if (verificationAccepted
) {
324 await
this.db
.subscriptionUpsert(txCtx
, verification
);
328 case Enum
.Mode
.Unsubscribe:
329 if (verificationAccepted
) {
330 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
334 case Enum
.Mode
.Denied:
335 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
339 this.logger
.error(_scope
, 'unanticipated mode', { logInfoData
});
340 throw new Errors
.InternalInconsistencyError(verification
.mode
);
343 await
this.db
.verificationComplete(dbCtx
, verificationId
, verification
.callback
, verification
.topicId
);
346 this.logger
.info(_scope
, 'verification complete', { ...logInfoData
, verificationAccepted
});
351 * Attempt to verify a pending subscription request with publisher.
352 * Updates (and persists) verification.
353 * Returns boolean of status of publisher contact, and hence
354 * whether to continue verification with client.
356 * @param {TopicData} topic
357 * @param {VerificationData} verification
360 async
publisherValidate(dbCtx
, topic
, verification
) {
361 const _scope
= _fileScope('publisherValidate');
362 const publisherValidationRequestConfig
= Communication
._publisherValidationAxiosConfig(topic
, verification
);
363 const logInfoData
= {
365 callbackUrl: verification
.callback
,
366 requestId: verification
.requestId
,
370 this.logger
.info(_scope
, 'publisher validation request', logInfoData
);
373 response
= await
this.axios(publisherValidationRequestConfig
);
375 this.logger
.error(_scope
, 'publisher validation failed', { ...logInfoData
, error: e
});
376 return false; // Do not continue with client verification.
379 logInfoData
.response
= common
.axiosResponseLogData(response
);
380 this.logger
.debug(_scope
, 'validation response', logInfoData
);
382 let verificationNeedsUpdate
= false;
383 switch (common
.httpStatusCodeClass(response
.status
)) {
385 this.logger
.info(_scope
, 'publisher validation complete, allowed', logInfoData
);
389 this.logger
.info(_scope
, 'publisher validation remote server error', logInfoData
);
390 return false; // Do not continue with client verification.
393 this.logger
.info(_scope
, 'publisher validation complete, denied', logInfoData
);
394 // Change client verification
395 verification
.mode
= Enum
.Mode
.Denied
;
396 verification
.reason
= 'publisher rejected request'; // TODO: details from response?
397 verificationNeedsUpdate
= true;
400 // Success from publisher, either accepted or denied.
401 // Set validated flag, and allow client verification to continue.
402 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
403 if (verificationNeedsUpdate
) {
404 await
this.db
.verificationUpdate(txCtx
, verification
.id
, verification
);
406 await
this.db
.verificationValidated(txCtx
, verification
.id
);
413 * Retrieve content from a topic.
416 * @param {String} requestId
419 async
topicFetchProcess(dbCtx
, topicId
, requestId
) {
420 const _scope
= _fileScope('topicFetchProcess');
421 const logInfoData
= {
426 this.logger
.debug(_scope
, 'called', logInfoData
);
428 const topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
429 if (topic
=== undefined) {
430 this.logger
.error(_scope
, 'no such topic id', logInfoData
);
431 throw new Errors
.InternalInconsistencyError('no such topic id');
434 logInfoData
.url
= topicId
.url
;
436 if (topic
.isDeleted
) {
437 this.logger
.debug(_scope
, 'topic deleted, skipping update request', logInfoData
);
441 const updateRequestConfig
= Communication
._topicFetchAxiosConfig(topic
);
443 this.logger
.info(_scope
, 'topic update request', logInfoData
);
447 response
= await
this.axios(updateRequestConfig
);
449 this.logger
.error(_scope
, 'update request failed', logInfoData
);
450 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
453 logInfoData
.response
= common
.axiosResponseLogData(response
);
454 this.logger
.debug(_scope
, 'fetch response', logInfoData
);
456 switch (common
.httpStatusCodeClass(response
.status
)) {
458 // Fall out of switch on success
462 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
463 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
467 this.logger
.info(_scope
, 'fetch failed by status', logInfoData
);
468 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
472 const contentHash
= Communication
.contentHash(response
.data
, topic
.contentHashAlgorithm
);
473 logInfoData
.contentHash
= contentHash
;
474 if (topic
.contentHash
=== contentHash
) {
475 this.logger
.info(_scope
, 'content has not changed', logInfoData
);
476 await
this.db
.topicFetchComplete(dbCtx
, topicId
);
480 const validHub
= await
this.linkHelper
.validHub(topic
.url
, response
.headers
, response
.data
);
482 this.logger
.debug(_scope
, 'retrieved topic does not list us as hub', { logInfoData
});
483 if (this.options
.communication
.strictTopicHubLink
) {
484 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
485 // Set as deleted and set content_updated so subscriptions are notified.
486 await
this.db
.topicDeleted(txCtx
, topicId
);
487 await
this.db
.topicFetchComplete(txCtx
, topicId
);
493 const contentType
= response
.headers
[Enum
.Header
.ContentType
.toLowerCase()];
495 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
496 await
this.db
.topicSetContent(txCtx
, {
498 content: Buffer
.from(response
.data
),
500 ...(contentType
&& { contentType
}),
503 await
this.db
.topicFetchComplete(txCtx
, topicId
);
505 this.logger
.info(_scope
, 'content updated', logInfoData
);
510 * Attempt to deliver a topic's content to a subscription.
512 * @param {String} callback
514 * @param {String} requestId
516 async
subscriptionDeliveryProcess(dbCtx
, subscriptionId
, requestId
) {
517 const _scope
= _fileScope('subscriptionDeliveryProcess');
519 const logInfoData
= {
524 this.logger
.debug(_scope
, 'called', logInfoData
);
526 const subscription
= await
this.db
.subscriptionGetById(dbCtx
, subscriptionId
);
528 this.logger
.error(_scope
, 'no such subscription', logInfoData
);
529 throw new Errors
.InternalInconsistencyError('no such subscription');
532 logInfoData
.callback
= subscription
.callback
;
534 const topic
= await
this.db
.topicGetContentById(dbCtx
, subscription
.topicId
);
536 this.logger
.error(_scope
, 'no such topic', logInfoData
);
537 throw new Errors
.InternalInconsistencyError('no such topic');
540 if (topic
.isDeleted
) {
541 // If a topic has been set deleted, it does not list us as a valid hub.
542 // Queue an unsubscription.
543 const verification
= {
544 topicId: subscription
.topicId
,
545 callback: subscription
.callback
,
546 mode: Enum
.Mode
.Denied
,
547 reason: 'Gone: topic no longer valid on this hub.',
548 isPublisherValidated: true,
552 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
553 await
this.db
.verificationInsert(txCtx
, verification
);
554 await
this.db
.subscriptionDeliveryComplete(txCtx
, subscription
.callback
, subscription
.topicId
);
556 this.logger
.info(_scope
, 'update unsubscription for deleted topic', logInfoData
);
560 logInfoData
.contentLength
= topic
.content
.length
;
561 logInfoData
.contentHash
= topic
.contentHash
;
563 const updateAxiosConfig
= Communication
._axiosConfig('POST', subscription
.callback
, topic
.content
, {}, {
564 [Enum
.Header
.Link
]: `<${topic.url}>; rel="self"${this.linkHub}`,
565 [Enum
.Header
.ContentType
]: topic
.contentType
|| Enum
.ContentType
.TextPlain
,
566 ...(subscription
.secret
&& { [Enum
.Header
.XHubSignature
]: Communication
.signature(topic
.content
, subscription
.secret
, subscription
.signatureAlgorithm
) }),
569 this.logger
.info(_scope
, 'update request', logInfoData
);
573 response
= await
this.axios(updateAxiosConfig
);
575 this.logger
.error(_scope
, 'update request failed', { ...logInfoData
, error: e
});
576 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
579 logInfoData
.response
= common
.axiosResponseLogData(response
);
580 this.logger
.debug(_scope
, 'update response', logInfoData
);
582 switch (common
.httpStatusCodeClass(response
.status
)) {
584 // Fall out of switch on success.
588 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
589 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
593 if (response
.status
=== 410) { // GONE
594 this.logger
.info(_scope
, 'client declined further updates', logInfoData
);
595 await
this.db
.subscriptionDeliveryGone(dbCtx
, subscription
.callback
, subscription
.topicId
);
598 // All other 4xx falls through as failure
601 this.logger
.info(_scope
, 'update failed with non-2xx status code', logInfoData
);
602 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
606 await
this.db
.subscriptionDeliveryComplete(dbCtx
, subscription
.callback
, subscription
.topicId
);
607 this.logger
.info(_scope
, 'update success', logInfoData
);
612 * Claim and work a specific topic fetch task.
615 * @param {String} requestId
617 async
topicFetchClaimAndProcessById(dbCtx
, topicId
, requestId
) {
618 const _scope
= _fileScope('topicFetchClaimAndProcessById');
620 const claimResult
= await
this.db
.topicFetchClaimById(dbCtx
, topicId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
621 if (claimResult
.changes
!= 1) {
622 this.logger
.debug(_scope
, 'did not claim topic fetch', { topicId
, requestId
});
625 await
this.topicFetchProcess(dbCtx
, topicId
, requestId
);
630 * Claim and work a specific verification confirmation task.
632 * @param {*} verificationId
633 * @param {String} requestId
636 async
verificationClaimAndProcessById(dbCtx
, verificationId
, requestId
) {
637 const _scope
= _fileScope('verificationClaimAndProcessById');
639 const claimResult
= await
this.db
.verificationClaimById(dbCtx
, verificationId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
640 if (claimResult
.changes
!= 1) {
641 this.logger
.debug(_scope
, 'did not claim verification', { verificationId
, requestId
});
644 await
this.verificationProcess(dbCtx
, verificationId
, requestId
);
650 * @param {Number} wanted maximum tasks to claim
651 * @returns {Promise<void>[]}
653 async
workFeed(wanted
) {
654 const _scope
= _fileScope('workFeed');
655 const inProgress
= [];
656 const requestId
= common
.requestId();
657 const claimTimeoutSeconds
= this.options
.communication
.claimTimeoutSeconds
;
658 const nodeId
= this.options
.nodeId
;
659 let topicFetchPromises
= [], verificationPromises
= [], updatePromises
= [];
661 this.logger
.debug(_scope
, 'called', { wanted
});
664 await
this.db
.context(async (dbCtx
) => {
666 // Update topics before anything else.
667 const topicFetchIds
= await
this.db
.topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
668 // Each task gets a new context, as these map to connections in some dbs.
669 // This dbCtx goes away after launching the processing functions, so would not be available to tasks.
670 topicFetchPromises
= topicFetchIds
.map((id
) => this.db
.context((ctx
) => this.topicFetchProcess(ctx
, id
, requestId
)));
671 inProgress
.push(...topicFetchPromises
);
672 wanted
-= topicFetchPromises
.length
;
676 // Then any pending verifications.
677 const verifications
= await
this.db
.verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
678 verificationPromises
= verifications
.map((id
) => this.db
.context((ctx
) => this.verificationProcess(ctx
, id
, requestId
)));
679 inProgress
.push(...verificationPromises
);
680 wanted
-= verificationPromises
.length
;
684 // Finally dole out content.
685 const updates
= await
this.db
.subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
686 updatePromises
= updates
.map((id
) => this.db
.context((ctx
) => this.subscriptionDeliveryProcess(ctx
, id
, requestId
)));
687 inProgress
.push(...updatePromises
);
688 wanted
-= updatePromises
.length
;
692 this.logger
.error(_scope
, 'failed', { error: e
});
693 // do not re-throw, return what we've claimed so far
695 this.logger
.debug(_scope
, 'searched for work', { topics: topicFetchPromises
.length
, verifications: verificationPromises
.length
, updates: updatePromises
.length
, wantedRemaining: wanted
, requestId
});
703 module
.exports
= Communication
;