4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
8 const axios
= require('axios');
9 const common
= require('./common');
10 const crypto
= require('crypto');
11 const Enum
= require('./enum');
12 const Errors
= require('./errors');
13 const Worker
= require('./worker');
14 const LinkHelper
= require('./link-helper');
15 const { version: packageVersion
, name: packageName
} = require('../package.json'); // For default UA string
17 const { performance
} = require('perf_hooks');
19 const _fileScope
= common
.fileScope(__filename
);
22 constructor(logger
, db
, options
) {
25 this.options
= options
;
26 this.linkHelper
= new LinkHelper(logger
, options
);
28 if (this.options
.dingus
.selfBaseUrl
) {
29 this.linkHub
= `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
32 this.logger
.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
36 this.axios
= axios
.create({
37 validateStatus: null, // Non-success responses are not exceptional
39 [Enum
.Header
.UserAgent
]: Communication
.userAgentString(options
.userAgent
),
43 this.axios
.interceptors
.request
.use((request
) => {
44 request
.startTimestampMs
= performance
.now();
47 this.axios
.interceptors
.response
.use((response
) => {
48 response
.elapsedTimeMs
= performance
.now() - response
.config
.startTimestampMs
;
52 this.worker
= new Worker(logger
, db
, this.workFeed
.bind(this), options
);
57 static userAgentString(userAgentConfig
) {
58 // eslint-disable-next-line security/detect-object-injection
59 const _conf
= (field
, def
) => (userAgentConfig
&& field
in userAgentConfig
) ? userAgentConfig
[field
] : def
;
60 const product
= _conf('product', packageName
).split('/').pop();
61 const version
= _conf('version', packageVersion
);
62 let implementation
= _conf('implementation', Enum
.Specification
);
64 implementation
= ` (${implementation})`;
66 return `${product}/${version}${implementation}`;
71 * Generate a random string.
72 * @param {Integer} bytes
75 static async
generateChallenge(bytes
= 30) {
76 return (await common
.randomBytesAsync(bytes
)).toString('base64');
81 * Generate the signature string for content.
82 * @param {Buffer} message
83 * @param {Buffer} secret
84 * @param {String} algorithm
87 static signature(message
, secret
, algorithm
) {
88 const hmac
= crypto
.createHmac(algorithm
, secret
);
90 return `${algorithm}=${hmac.digest('hex')}`;
95 * Generate the hash for content.
96 * @param {Buffer} content
97 * @param {String} algorithm
100 static contentHash(content
, algorithm
) {
101 const hash
= crypto
.createHash(algorithm
);
102 hash
.update(content
);
103 return hash
.digest('hex');
108 * A request skeleton config.
109 * @param {String} method
110 * @param {String} requestUrl
111 * @param {String} body
112 * @param {Object} params
114 static _axiosConfig(method
, requestUrl
, body
, params
= {}, headers
= {}) {
115 const urlObj
= new URL(requestUrl
);
118 url: `${urlObj.origin}${urlObj.pathname}`,
119 params: urlObj
.searchParams
,
121 ...(body
&& { data: body
}),
122 // Setting this does not appear to be enough to keep axios from parsing JSON response into object
123 responseType: 'text',
124 // So force the matter by eliding all response transformations
125 transformResponse: [ (res
) => res
],
127 Object
.entries(params
).map(([k
, v
]) => config
.params
.set(k
, v
));
133 * Create request config for verifying an intent.
134 * @param {URL} requestUrl
135 * @param {String} topicUrl
136 * @param {String} mode
137 * @param {Integer} leaseSeconds
138 * @param {String} challenge
140 static _intentVerifyAxiosConfig(requestUrl
, topicUrl
, mode
, leaseSeconds
, challenge
) {
141 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
142 leaseSeconds
= leaseSeconds
.toString();
144 return Communication
._axiosConfig('GET', requestUrl
, undefined, {
146 'hub.topic': topicUrl
,
147 'hub.challenge': challenge
,
148 'hub.lease_seconds': leaseSeconds
,
154 * Create request config for denying an intent.
155 * @param {String} requestUrl
156 * @param {String} topicUrl
157 * @param {String} reason
160 static _intentDenyAxiosConfig(requestUrl
, topicUrl
, reason
) {
161 return Communication
._axiosConfig('GET', requestUrl
, undefined, {
162 'hub.mode': Enum
.Mode
.Denied
,
163 'hub.topic': topicUrl
,
164 ...(reason
&& { 'hub.reason': reason
}),
170 * Create request config for querying publisher for subscription validation.
171 * @param {Topic} topic
172 * @param {Verification} verification
175 static _publisherValidationAxiosConfig(topic
, verification
) {
177 callback: verification
.callback
,
179 ...(verification
.httpFrom
&& { from: verification
.httpFrom
}),
180 ...(verification
.httpRemoteAddr
&& { address: verification
.httpRemoteAddr
}),
182 return Communication
._axiosConfig('POST', topic
.publisherValidationUrl
, body
, {}, {
183 [Enum
.Header
.ContentType
]: Enum
.ContentType
.ApplicationJson
,
189 * Create request config for fetching topic content.
190 * Prefer existing content-type, but accept anything.
191 * @param {Topic} topic
194 static _topicFetchAxiosConfig(topic
) {
195 const acceptWildcard
= '*/*' + (topic
.contentType
? ';q=0.9' : '');
196 const acceptPreferred
= [topic
.contentType
, acceptWildcard
].filter((x
) => x
).join(', ');
197 return Communication
._axiosConfig('GET', topic
.url
, undefined, {}, {
198 [Enum
.Header
.Accept
]: acceptPreferred
,
204 * Attempt to verify a requested intent with client callback endpoint.
206 * @param {*} verificationId
207 * @param {String} requestId
208 * @returns {Boolean} whether to subsequently attempt next task if verification succeeds
210 async
verificationProcess(dbCtx
, verificationId
, requestId
) {
211 const _scope
= _fileScope('verificationProcess');
213 const verification
= await
this.db
.verificationGetById(dbCtx
, verificationId
);
215 this.logger
.error(_scope
, 'no such verification', { verificationId
, requestId
});
216 throw new Errors
.InternalInconsistencyError('no such verification id');
219 const topic
= await
this.db
.topicGetById(dbCtx
, verification
.topicId
);
221 this.logger
.error(_scope
, 'no such topic id', { verification
, requestId
});
222 throw new Errors
.InternalInconsistencyError('no such topic id');
225 if (!topic
.isActive
) {
226 // These should be filtered out when selecting verification tasks to process.
227 this.logger
.debug(_scope
, 'topic not active, skipping verification', { verification
, requestId
});
228 await
this.db
.verificationRelease(dbCtx
, verificationId
);
232 // If topic is deleted, deny any subscriptions.
233 // Un-subscriptions can continue to be verified.
234 if (topic
.isDeleted
&& verification
.mode
=== Enum
.Mode
.Subscribe
) {
235 this.logger
.info(_scope
, 'topic is deleted, verification becomes denial', { verification
, requestId
});
237 verification
.mode
= Enum
.Mode
.Denied
;
238 verification
.reason
= 'Gone: topic no longer valid on this hub.';
239 verification
.isPublisherValidated
= true;
240 await
this.db
.verificationUpdate(dbCtx
, verification
);
243 // If verification needs publisher validation, this delivery is for publisher.
244 if (verification
.mode
=== Enum
.Mode
.Subscribe
&& verification
.isPublisherValidated
=== false) {
245 this.logger
.debug(_scope
, 'attempting publisher validation', { verification
, requestId
});
246 const continueVerification
= await
this.publisherValidate(dbCtx
, topic
, verification
);
248 // If publisher validation completed, verification will proceed.
249 // If not, we're done for now and shall try again later.
250 if (!continueVerification
) {
251 this.logger
.debug(_scope
, 'publisher validation did not complete, belaying verification', { verification
});
252 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
257 const u
= new URL(verification
.callback
);
258 let callbackRequestConfig
, challenge
;
259 if (verification
.mode
=== Enum
.Mode
.Denied
) {
260 // Denials don't have a challenge.
261 callbackRequestConfig
= Communication
._intentDenyAxiosConfig(u
, topic
.url
, verification
.reason
);
263 // Subscriptions and unsubscriptions require challenge matching.
264 challenge
= await Communication
.generateChallenge();
265 callbackRequestConfig
= Communication
._intentVerifyAxiosConfig(u
, topic
.url
, verification
.mode
, verification
.leaseSeconds
, challenge
);
268 const logInfoData
= {
271 mode: verification
.mode
,
272 originalRequestId: verification
.requestId
,
277 this.logger
.info(_scope
, 'verification request', logInfoData
);
281 response
= await
this.axios(callbackRequestConfig
);
283 this.logger
.error(_scope
, 'verification request failed', { ...logInfoData
, error: e
});
284 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
287 logInfoData
.response
= common
.axiosResponseLogData(response
);
288 this.logger
.debug(_scope
, 'verification response', logInfoData
);
290 let verificationAccepted
= true; // Presume success.
292 switch (common
.httpStatusCodeClass(response
.status
)) {
294 // Success, fall out of switch.
299 this.logger
.info(_scope
, 'verification remote server error', logInfoData
);
300 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
304 // Anything else is unsuccessful.
305 this.logger
.info(_scope
, 'verification rejected by status', logInfoData
);
306 verificationAccepted
= false;
309 // Any denial is not accepted.
310 if (verification
.mode
=== Enum
.Mode
.Denied
) {
311 this.logger
.info(_scope
, 'verification denial accepted', logInfoData
);
312 verificationAccepted
= false;
315 if ([Enum
.Mode
.Subscribe
, Enum
.Mode
.Unsubscribe
].includes(verification
.mode
)
316 && response
.data
!== challenge
) {
317 this.logger
.info(_scope
, 'verification rejected by challenge', logInfoData
);
318 verificationAccepted
= false;
321 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
322 switch (verification
.mode
) {
323 case Enum
.Mode
.Subscribe:
324 if (verificationAccepted
) {
325 await
this.db
.subscriptionUpsert(txCtx
, verification
);
329 case Enum
.Mode
.Unsubscribe:
330 if (verificationAccepted
) {
331 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
332 if (topic
.isDeleted
) {
333 // Remove a deleted topic after the last subscription is notified.
334 await
this.db
.topicPendingDelete(txCtx
, topic
.id
);
339 case Enum
.Mode
.Denied:
340 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
341 if (topic
.isDeleted
) {
342 // Remove a deleted topic after he last subscription is notified.
343 await
this.db
.topicPendingDelete(txCtx
, topic
.id
);
348 this.logger
.error(_scope
, 'unanticipated mode', { logInfoData
});
349 throw new Errors
.InternalInconsistencyError(verification
.mode
);
352 await
this.db
.verificationComplete(txCtx
, verificationId
, verification
.callback
, verification
.topicId
);
355 this.logger
.info(_scope
, 'verification complete', { ...logInfoData
, verificationAccepted
});
360 * Attempt to verify a pending subscription request with publisher.
361 * Updates (and persists) verification.
362 * Returns boolean of status of publisher contact, and hence
363 * whether to continue verification with client.
365 * @param {TopicData} topic
366 * @param {VerificationData} verification
369 async
publisherValidate(dbCtx
, topic
, verification
) {
370 const _scope
= _fileScope('publisherValidate');
371 const publisherValidationRequestConfig
= Communication
._publisherValidationAxiosConfig(topic
, verification
);
372 const logInfoData
= {
374 callbackUrl: verification
.callback
,
375 requestId: verification
.requestId
,
379 this.logger
.info(_scope
, 'publisher validation request', logInfoData
);
382 response
= await
this.axios(publisherValidationRequestConfig
);
384 this.logger
.error(_scope
, 'publisher validation failed', { ...logInfoData
, error: e
});
385 return false; // Do not continue with client verification.
388 logInfoData
.response
= common
.axiosResponseLogData(response
);
389 this.logger
.debug(_scope
, 'validation response', logInfoData
);
391 let verificationNeedsUpdate
= false;
392 switch (common
.httpStatusCodeClass(response
.status
)) {
394 this.logger
.info(_scope
, 'publisher validation complete, allowed', logInfoData
);
398 this.logger
.info(_scope
, 'publisher validation remote server error', logInfoData
);
399 return false; // Do not continue with client verification.
402 this.logger
.info(_scope
, 'publisher validation complete, denied', logInfoData
);
403 // Change client verification
404 verification
.mode
= Enum
.Mode
.Denied
;
405 verification
.reason
= 'publisher rejected request'; // TODO: details from response?
406 verificationNeedsUpdate
= true;
409 // Success from publisher, either accepted or denied.
410 // Set validated flag, and allow client verification to continue.
411 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
412 if (verificationNeedsUpdate
) {
413 await
this.db
.verificationUpdate(txCtx
, verification
.id
, verification
);
415 await
this.db
.verificationValidated(txCtx
, verification
.id
);
422 * Retrieve content from a topic.
425 * @param {String} requestId
428 async
topicFetchProcess(dbCtx
, topicId
, requestId
) {
429 const _scope
= _fileScope('topicFetchProcess');
430 const logInfoData
= {
435 this.logger
.debug(_scope
, 'called', logInfoData
);
437 const topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
438 if (topic
=== undefined) {
439 this.logger
.error(_scope
, 'no such topic id', logInfoData
);
440 throw new Errors
.InternalInconsistencyError('no such topic id');
443 // Cull any expired subscriptions
444 await
this.db
.subscriptionDeleteExpired(dbCtx
, topicId
);
446 logInfoData
.url
= topic
.url
;
448 if (topic
.isDeleted
) {
449 this.logger
.debug(_scope
, 'topic deleted, skipping update request', logInfoData
);
453 const updateRequestConfig
= Communication
._topicFetchAxiosConfig(topic
);
455 this.logger
.info(_scope
, 'topic update request', logInfoData
);
459 response
= await
this.axios(updateRequestConfig
);
461 this.logger
.error(_scope
, 'update request failed', logInfoData
);
462 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
465 logInfoData
.response
= common
.axiosResponseLogData(response
);
466 this.logger
.debug(_scope
, 'fetch response', logInfoData
);
468 switch (common
.httpStatusCodeClass(response
.status
)) {
470 // Fall out of switch on success
474 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
475 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
479 this.logger
.info(_scope
, 'fetch failed by status', logInfoData
);
480 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
484 const contentHash
= Communication
.contentHash(response
.data
, topic
.contentHashAlgorithm
);
485 logInfoData
.contentHash
= contentHash
;
486 if (topic
.contentHash
=== contentHash
) {
487 this.logger
.info(_scope
, 'content has not changed', logInfoData
);
488 await
this.db
.topicFetchComplete(dbCtx
, topicId
);
492 const validHub
= await
this.linkHelper
.validHub(topic
.url
, response
.headers
, response
.data
);
494 this.logger
.info(_scope
, 'retrieved topic does not list us as hub', { logInfoData
});
495 if (this.options
.communication
.strictTopicHubLink
) {
496 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
497 // Set as deleted and set content_updated so subscriptions are notified.
498 await
this.db
.topicDeleted(txCtx
, topicId
);
499 await
this.db
.topicFetchComplete(txCtx
, topicId
);
501 // Attempt to remove from db, if no active subscriptions.
502 await
this.db
.topicPendingDelete(dbCtx
, topicId
);
507 const contentType
= response
.headers
[Enum
.Header
.ContentType
.toLowerCase()];
509 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
510 await
this.db
.topicSetContent(txCtx
, {
512 content: Buffer
.from(response
.data
),
514 ...(contentType
&& { contentType
}),
517 await
this.db
.topicFetchComplete(txCtx
, topicId
);
519 this.logger
.info(_scope
, 'content updated', logInfoData
);
524 * Attempt to deliver a topic's content to a subscription.
526 * @param {String} callback
528 * @param {String} requestId
530 async
subscriptionDeliveryProcess(dbCtx
, subscriptionId
, requestId
) {
531 const _scope
= _fileScope('subscriptionDeliveryProcess');
533 const logInfoData
= {
538 this.logger
.debug(_scope
, 'called', logInfoData
);
540 const subscription
= await
this.db
.subscriptionGetById(dbCtx
, subscriptionId
);
542 this.logger
.error(_scope
, 'no such subscription', logInfoData
);
543 throw new Errors
.InternalInconsistencyError('no such subscription');
546 logInfoData
.callback
= subscription
.callback
;
548 const topic
= await
this.db
.topicGetContentById(dbCtx
, subscription
.topicId
);
550 this.logger
.error(_scope
, 'no such topic', logInfoData
);
551 throw new Errors
.InternalInconsistencyError('no such topic');
554 if (topic
.isDeleted
) {
555 // If a topic has been set deleted, it does not list us as a valid hub.
556 // Queue an unsubscription.
557 const verification
= {
558 topicId: subscription
.topicId
,
559 callback: subscription
.callback
,
560 mode: Enum
.Mode
.Denied
,
561 reason: 'Gone: topic no longer valid on this hub.',
562 isPublisherValidated: true,
566 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
567 await
this.db
.verificationInsert(txCtx
, verification
);
568 await
this.db
.subscriptionDeliveryComplete(txCtx
, subscription
.callback
, subscription
.topicId
);
570 this.logger
.info(_scope
, 'update unsubscription for deleted topic', logInfoData
);
574 logInfoData
.contentLength
= topic
.content
.length
;
575 logInfoData
.contentHash
= topic
.contentHash
;
577 const updateAxiosConfig
= Communication
._axiosConfig('POST', subscription
.callback
, topic
.content
, {}, {
578 [Enum
.Header
.Link
]: `<${topic.url}>; rel="self"${this.linkHub}`,
579 [Enum
.Header
.ContentType
]: topic
.contentType
|| Enum
.ContentType
.TextPlain
,
580 ...(subscription
.secret
&& { [Enum
.Header
.XHubSignature
]: Communication
.signature(topic
.content
, subscription
.secret
, subscription
.signatureAlgorithm
) }),
583 this.logger
.info(_scope
, 'update request', logInfoData
);
587 response
= await
this.axios(updateAxiosConfig
);
589 this.logger
.error(_scope
, 'update request failed', { ...logInfoData
, error: e
});
590 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
593 logInfoData
.response
= common
.axiosResponseLogData(response
);
594 this.logger
.debug(_scope
, 'update response', logInfoData
);
596 switch (common
.httpStatusCodeClass(response
.status
)) {
598 // Fall out of switch on success.
602 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
603 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
607 if (response
.status
=== 410) { // GONE
608 this.logger
.info(_scope
, 'client declined further updates', logInfoData
);
609 await
this.db
.subscriptionDeliveryGone(dbCtx
, subscription
.callback
, subscription
.topicId
);
612 // All other 4xx falls through as failure
615 this.logger
.info(_scope
, 'update failed with non-2xx status code', logInfoData
);
616 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
620 await
this.db
.subscriptionDeliveryComplete(dbCtx
, subscription
.callback
, subscription
.topicId
);
621 this.logger
.info(_scope
, 'update success', logInfoData
);
626 * Claim and work a specific topic fetch task.
629 * @param {String} requestId
631 async
topicFetchClaimAndProcessById(dbCtx
, topicId
, requestId
) {
632 const _scope
= _fileScope('topicFetchClaimAndProcessById');
634 const claimResult
= await
this.db
.topicFetchClaimById(dbCtx
, topicId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
635 if (claimResult
.changes
!= 1) {
636 this.logger
.debug(_scope
, 'did not claim topic fetch', { topicId
, requestId
});
639 await
this.topicFetchProcess(dbCtx
, topicId
, requestId
);
644 * Claim and work a specific verification confirmation task.
646 * @param {*} verificationId
647 * @param {String} requestId
650 async
verificationClaimAndProcessById(dbCtx
, verificationId
, requestId
) {
651 const _scope
= _fileScope('verificationClaimAndProcessById');
653 const claimResult
= await
this.db
.verificationClaimById(dbCtx
, verificationId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
654 if (claimResult
.changes
!= 1) {
655 this.logger
.debug(_scope
, 'did not claim verification', { verificationId
, requestId
});
658 await
this.verificationProcess(dbCtx
, verificationId
, requestId
);
665 * @param {Number} wanted maximum tasks to claim
666 * @returns {Promise<void>[]}
668 async
workFeed(dbCtx
, wanted
) {
669 const _scope
= _fileScope('workFeed');
670 const inProgress
= [];
671 const requestId
= common
.requestId();
672 const claimTimeoutSeconds
= this.options
.communication
.claimTimeoutSeconds
;
673 const nodeId
= this.options
.nodeId
;
674 let topicFetchPromises
= [], verificationPromises
= [], updatePromises
= [];
676 this.logger
.debug(_scope
, 'called', { wanted
});
680 // Update topics before anything else.
681 const topicFetchIds
= await
this.db
.topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
682 topicFetchPromises
= topicFetchIds
.map((id
) => this.db
.context((ctx
) => this.topicFetchProcess(ctx
, id
, requestId
)));
683 inProgress
.push(...topicFetchPromises
);
684 wanted
-= topicFetchPromises
.length
;
688 // Then any pending verifications.
689 const verifications
= await
this.db
.verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
690 verificationPromises
= verifications
.map((id
) => this.db
.context((ctx
) => this.verificationProcess(ctx
, id
, requestId
)));
691 inProgress
.push(...verificationPromises
);
692 wanted
-= verificationPromises
.length
;
696 // Finally dole out content.
697 const updates
= await
this.db
.subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
698 updatePromises
= updates
.map((id
) => this.db
.context((ctx
) => this.subscriptionDeliveryProcess(ctx
, id
, requestId
)));
699 inProgress
.push(...updatePromises
);
700 wanted
-= updatePromises
.length
;
703 this.logger
.error(_scope
, 'failed', { error: e
});
704 // do not re-throw, return what we've claimed so far
706 this.logger
.debug(_scope
, 'searched for work', { topics: topicFetchPromises
.length
, verifications: verificationPromises
.length
, updates: updatePromises
.length
, wantedRemaining: wanted
, requestId
});
714 module
.exports
= Communication
;