4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
8 const common
= require('./common');
9 const crypto
= require('crypto');
10 const Enum
= require('./enum');
11 const Errors
= require('./errors');
12 const Worker
= require('./worker');
13 const LinkHelper
= require('./link-helper');
14 const { version: packageVersion
, name: packageName
} = require('../package.json'); // For default UA string
16 const _fileScope
= common
.fileScope(__filename
);
19 constructor(logger
, db
, options
) {
22 this.options
= options
;
23 this.linkHelper
= new LinkHelper(logger
, options
);
25 if (this.options
.dingus
.selfBaseUrl
) {
26 this.linkHub
= `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
29 this.logger
.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
32 this.Got
= undefined; // Will become the async imported got.
33 this.got
= this._init
; // First invocation imports got and replaces this.
35 this.worker
= new Worker(logger
, db
, this.workFeed
.bind(this), options
);
41 * Do a little dance to cope with ESM dynamic import.
42 * @param {...any} args
43 * @returns {Promise<any>}
45 async
_init(...args
) {
47 // For some reason eslint is confused about import being supported here.
48 // eslint-disable-next-line
49 this.Got
= await
import('got');
50 this.got
= this.Got
.got
.extend({
51 followRedirect: false, // Outgoing API calls should not encounter redirects
52 throwHttpErrors: false, // We will be checking status codes explicitly
54 [Enum
.Header
.UserAgent
]: Communication
.userAgentString(this.options
.userAgent
),
57 request: this.options
.communication
.requestTimeoutMs
|| 120000,
67 /* istanbul ignore if */
69 /* istanbul ignore next */
70 return this.got(...args
);
76 * Take note of transient retries.
78 * @param {*} retryCount
80 _onRetry(error
, retryCount
) {
81 const _scope
= _fileScope('_onRetry');
82 this.logger
.debug(_scope
, 'retry', { retryCount
, error
});
87 * Construct a user-agent value.
88 * @param {Object} userAgentConfig
89 * @param {String=} userAgentConfig.product
90 * @param {String=} userAgentConfig.version
91 * @param {String=} userAgentConfig.implementation
94 static userAgentString(userAgentConfig
) {
95 // eslint-disable-next-line security/detect-object-injection
96 const _conf
= (field
, def
) => (userAgentConfig
&& field
in userAgentConfig
) ? userAgentConfig
[field
] : def
;
97 const product
= _conf('product', packageName
).split('/').pop();
98 const version
= _conf('version', packageVersion
);
99 let implementation
= _conf('implementation', Enum
.Specification
);
100 if (implementation
) {
101 implementation
= ` (${implementation})`;
103 return `${product}/${version}${implementation}`;
108 * Generate a random string.
109 * @param {Integer} bytes
110 * @returns {Promise<String>}
112 static async
generateChallenge(bytes
= 30) {
113 return (await common
.randomBytesAsync(bytes
)).toString('base64');
118 * Generate the signature string for content.
119 * @param {Buffer} message
120 * @param {Buffer} secret
121 * @param {String} algorithm
124 static signature(message
, secret
, algorithm
) {
125 const hmac
= crypto
.createHmac(algorithm
, secret
)
128 return `${algorithm}=${hmac}`;
133 * Generate the hash for content.
134 * @param {Buffer} content
135 * @param {String} algorithm
138 static contentHash(content
, algorithm
) {
139 return crypto
.createHash(algorithm
)
146 * Attempt to verify a requested intent with client callback endpoint.
148 * @param {*} verificationId
149 * @param {String} requestId
150 * @returns {Boolean} whether to subsequently attempt next task if verification succeeds
152 async
verificationProcess(dbCtx
, verificationId
, requestId
) {
153 const _scope
= _fileScope('verificationProcess');
155 const verification
= await
this.db
.verificationGetById(dbCtx
, verificationId
);
157 this.logger
.error(_scope
, 'no such verification', { verificationId
, requestId
});
158 throw new Errors
.InternalInconsistencyError('no such verification id');
161 const topic
= await
this.db
.topicGetById(dbCtx
, verification
.topicId
);
163 this.logger
.error(_scope
, Enum
.Message
.NoSuchTopicId
, { verification
, requestId
});
164 throw new Errors
.InternalInconsistencyError(Enum
.Message
.NoSuchTopicId
);
167 if (!topic
.isActive
) {
168 // These should be filtered out when selecting verification tasks to process.
169 this.logger
.debug(_scope
, 'topic not active, skipping verification', { verification
, requestId
});
170 await
this.db
.verificationRelease(dbCtx
, verificationId
);
174 // If topic is deleted, deny any subscriptions.
175 // Un-subscriptions can continue to be verified.
176 if (topic
.isDeleted
&& verification
.mode
=== Enum
.Mode
.Subscribe
) {
177 this.logger
.info(_scope
, 'topic is deleted, verification becomes denial', { verification
, requestId
});
179 verification
.mode
= Enum
.Mode
.Denied
;
180 verification
.reason
= 'Gone: topic no longer valid on this hub.';
181 verification
.isPublisherValidated
= true;
182 await
this.db
.verificationUpdate(dbCtx
, verification
);
185 // If verification needs publisher validation, this delivery is for publisher.
186 if (verification
.mode
=== Enum
.Mode
.Subscribe
&& verification
.isPublisherValidated
=== false) {
187 this.logger
.debug(_scope
, 'attempting publisher validation', { verification
, requestId
});
188 const continueVerification
= await
this.publisherValidate(dbCtx
, topic
, verification
);
190 // If publisher validation completed, verification will proceed.
191 // If not, we're done for now and shall try again later.
192 if (!continueVerification
) {
193 this.logger
.debug(_scope
, 'publisher validation did not complete, belaying verification', { verification
});
194 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
199 const callbackRequestConfig
= {
201 url: new URL(verification
.callback
),
202 responseType: 'text',
204 const callbackParams
= {
205 'hub.topic': topic
.url
,
206 'hub.mode': verification
.mode
,
210 if (verification
.mode
=== Enum
.Mode
.Denied
) {
211 // Denials don't have a challenge, but might have a reason.
212 if (verification
.reason
) {
213 callbackParams
['hub.reason'] = verification
.reason
;
216 // Subscriptions and unsubscriptions require challenge matching.
217 challenge
= await Communication
.generateChallenge();
218 Object
.assign(callbackParams
, {
219 'hub.challenge': challenge
,
220 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
221 'hub.lease_seconds': verification
.leaseSeconds
.toString(),
224 Object
.entries(callbackParams
)
225 .forEach(([k
, v
]) => callbackRequestConfig
.url
.searchParams
.set(k
, v
))
228 const logInfoData
= {
229 callbackUrl: callbackRequestConfig
.url
.href
,
231 mode: verification
.mode
,
232 originalRequestId: verification
.requestId
,
237 this.logger
.info(_scope
, 'verification request', logInfoData
);
241 response
= await
this.got(callbackRequestConfig
);
243 this.logger
.error(_scope
, 'verification request failed', { ...logInfoData
, error: e
});
244 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
247 logInfoData
.response
= common
.gotResponseLogData(response
);
248 this.logger
.debug(_scope
, 'verification response', logInfoData
);
250 let verificationAccepted
= true; // Presume success.
252 switch (common
.httpStatusCodeClass(response
.statusCode
)) {
254 // Success, fall out of switch.
259 this.logger
.info(_scope
, 'verification remote server error', logInfoData
);
260 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
264 // Anything else is unsuccessful.
265 this.logger
.info(_scope
, 'verification rejected by status', logInfoData
);
266 verificationAccepted
= false;
269 // Any denial is not accepted.
270 if (verification
.mode
=== Enum
.Mode
.Denied
) {
271 this.logger
.info(_scope
, 'verification denial accepted', logInfoData
);
272 verificationAccepted
= false;
275 if ([Enum
.Mode
.Subscribe
, Enum
.Mode
.Unsubscribe
].includes(verification
.mode
)
276 && response
.body
!== challenge
) {
277 this.logger
.info(_scope
, 'verification rejected by challenge', logInfoData
);
278 verificationAccepted
= false;
281 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
282 switch (verification
.mode
) {
283 case Enum
.Mode
.Subscribe:
284 if (verificationAccepted
) {
285 await
this.db
.subscriptionUpsert(txCtx
, verification
);
289 case Enum
.Mode
.Unsubscribe:
290 if (verificationAccepted
) {
291 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
292 if (topic
.isDeleted
) {
293 // Remove a deleted topic after the last subscription is notified.
294 await
this.db
.topicPendingDelete(txCtx
, topic
.id
);
299 case Enum
.Mode
.Denied:
300 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
301 if (topic
.isDeleted
) {
302 // Remove a deleted topic after the last subscription is notified.
303 await
this.db
.topicPendingDelete(txCtx
, topic
.id
);
308 this.logger
.error(_scope
, 'unanticipated mode', { logInfoData
});
309 throw new Errors
.InternalInconsistencyError(verification
.mode
);
312 await
this.db
.verificationComplete(txCtx
, verificationId
, verification
.callback
, verification
.topicId
);
315 this.logger
.info(_scope
, 'verification complete', { ...logInfoData
, verificationAccepted
});
320 * Attempt to verify a pending subscription request with publisher.
321 * Updates (and persists) verification.
322 * Returns boolean of status of publisher contact, and hence
323 * whether to continue verification with client.
325 * This is not defined by the spec. We opt to speak JSON here.
327 * @param {TopicData} topic
328 * @param {VerificationData} verification
331 async
publisherValidate(dbCtx
, topic
, verification
) {
332 const _scope
= _fileScope('publisherValidate');
333 const logInfoData
= {
335 callbackUrl: verification
.callback
,
336 requestId: verification
.requestId
,
340 this.logger
.info(_scope
, 'publisher validation request', logInfoData
);
342 const publisherValidationRequestConfig
= {
344 url: topic
.publisherValidationUrl
,
346 callback: verification
.callback
,
348 ...(verification
.httpFrom
&& { from: verification
.httpFrom
}),
349 ...(verification
.httpRemoteAddr
&& { address: verification
.httpRemoteAddr
}),
351 responseType: 'json',
354 response
= await
this.got(publisherValidationRequestConfig
);
356 this.logger
.error(_scope
, 'publisher validation failed', { ...logInfoData
, error: e
});
357 return false; // Do not continue with client verification.
360 logInfoData
.response
= common
.gotResponseLogData(response
);
361 this.logger
.debug(_scope
, 'validation response', logInfoData
);
363 let verificationNeedsUpdate
= false;
364 switch (common
.httpStatusCodeClass(response
.statusCode
)) {
366 this.logger
.info(_scope
, 'publisher validation complete, allowed', logInfoData
);
370 this.logger
.info(_scope
, 'publisher validation remote server error', logInfoData
);
371 return false; // Do not continue with client verification.
374 this.logger
.info(_scope
, 'publisher validation complete, denied', logInfoData
);
375 // Change client verification
376 verification
.mode
= Enum
.Mode
.Denied
;
377 verification
.reason
= 'publisher rejected request'; // TODO: details from response?
378 verificationNeedsUpdate
= true;
381 // Success from publisher, either accepted or denied.
382 // Set validated flag, and allow client verification to continue.
383 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
384 if (verificationNeedsUpdate
) {
385 await
this.db
.verificationUpdate(txCtx
, verification
.id
, verification
);
387 await
this.db
.verificationValidated(txCtx
, verification
.id
);
394 * Retrieve content from a topic.
397 * @param {String} requestId
400 async
topicFetchProcess(dbCtx
, topicId
, requestId
) {
401 const _scope
= _fileScope('topicFetchProcess');
402 const logInfoData
= {
407 this.logger
.debug(_scope
, 'called', logInfoData
);
409 const topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
410 if (topic
=== undefined) {
411 this.logger
.error(_scope
, Enum
.Message
.NoSuchTopicId
, logInfoData
);
412 throw new Errors
.InternalInconsistencyError(Enum
.Message
.NoSuchTopicId
);
415 // Cull any expired subscriptions
416 await
this.db
.subscriptionDeleteExpired(dbCtx
, topicId
);
418 logInfoData
.url
= topic
.url
;
420 if (topic
.isDeleted
) {
421 this.logger
.debug(_scope
, 'topic deleted, skipping update request', logInfoData
);
425 const updateRequestConfig
= {
426 followRedirect: true,
430 [Enum
.Header
.Accept
]: [topic
.contentType
, `*/*${topic.contentType ? ';q=0.9' : ''}`].filter((x
) => x
).join(', '),
431 ...(topic
.httpEtag
&& { [Enum
.Header
.IfNoneMatch
]: topic
.httpEtag
}),
432 ...(topic
.httpLastModified
&& { [Enum
.Header
.IfModifiedSince
]: topic
.httpLastModified
}),
434 responseType: 'buffer',
437 this.logger
.info(_scope
, 'topic update request', logInfoData
);
441 response
= await
this.got(updateRequestConfig
);
443 this.logger
.error(_scope
, 'update request failed', logInfoData
);
444 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
447 logInfoData
.response
= common
.gotResponseLogData(response
);
448 this.logger
.debug(_scope
, 'fetch response', logInfoData
);
450 switch (common
.httpStatusCodeClass(response
.statusCode
)) {
453 // Fall out of switch on success
457 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
458 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
462 this.logger
.info(_scope
, 'fetch failed by status', logInfoData
);
463 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
467 if (response
.statusCode
=== 304) {
468 this.logger
.info(_scope
, 'content has not changed, per server', logInfoData
);
469 await
this.db
.topicFetchComplete(dbCtx
, topicId
);
473 const contentHash
= Communication
.contentHash(response
.body
, topic
.contentHashAlgorithm
);
474 logInfoData
.contentHash
= contentHash
;
475 if (topic
.contentHash
=== contentHash
) {
476 this.logger
.info(_scope
, 'content has not changed', logInfoData
);
477 await
this.db
.topicFetchComplete(dbCtx
, topicId
);
481 const validHub
= await
this.linkHelper
.validHub(topic
.url
, response
.headers
, response
.body
);
483 this.logger
.info(_scope
, 'retrieved topic does not list us as hub', { logInfoData
});
484 if (this.options
.communication
.strictTopicHubLink
) {
485 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
486 // Set as deleted and set content_updated so subscriptions are notified.
487 await
this.db
.topicDeleted(txCtx
, topicId
);
488 await
this.db
.topicFetchComplete(txCtx
, topicId
);
490 // Attempt to remove from db, if no active subscriptions.
491 await
this.db
.topicPendingDelete(dbCtx
, topicId
);
496 const contentType
= response
.headers
[Enum
.Header
.ContentType
.toLowerCase()];
497 const httpETag
= response
.headers
[Enum
.Header
.ETag
.toLowerCase()];
498 const httpLastModified
= response
.headers
[Enum
.Header
.LastModified
.toLowerCase()];
500 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
501 await
this.db
.topicSetContent(txCtx
, {
503 content: Buffer
.from(response
.body
),
505 ...(contentType
&& { contentType
}),
506 ...(httpETag
&& { httpETag
}),
507 ...(httpLastModified
&& { httpLastModified
}),
510 await
this.db
.topicFetchComplete(txCtx
, topicId
);
512 this.logger
.info(_scope
, 'content updated', logInfoData
);
517 * Attempt to deliver a topic's content to a subscription.
519 * @param {String} callback
521 * @param {String} requestId
523 async
subscriptionDeliveryProcess(dbCtx
, subscriptionId
, requestId
) {
524 const _scope
= _fileScope('subscriptionDeliveryProcess');
526 const logInfoData
= {
531 this.logger
.debug(_scope
, 'called', logInfoData
);
533 const subscription
= await
this.db
.subscriptionGetById(dbCtx
, subscriptionId
);
535 this.logger
.error(_scope
, 'no such subscription', logInfoData
);
536 throw new Errors
.InternalInconsistencyError('no such subscription');
539 logInfoData
.callback
= subscription
.callback
;
541 const topic
= await
this.db
.topicGetContentById(dbCtx
, subscription
.topicId
);
543 this.logger
.error(_scope
, 'no such topic', logInfoData
);
544 throw new Errors
.InternalInconsistencyError('no such topic');
547 if (topic
.isDeleted
) {
548 // If a topic has been set deleted, it does not list us as a valid hub.
549 // Queue an unsubscription.
550 const verification
= {
551 topicId: subscription
.topicId
,
552 callback: subscription
.callback
,
553 mode: Enum
.Mode
.Denied
,
554 reason: 'Gone: topic no longer valid on this hub.',
555 isPublisherValidated: true,
559 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
560 await
this.db
.verificationInsert(txCtx
, verification
);
561 await
this.db
.subscriptionDeliveryComplete(txCtx
, subscription
.callback
, subscription
.topicId
, topic
.contentUpdated
);
563 this.logger
.info(_scope
, 'update unsubscription for deleted topic', logInfoData
);
567 logInfoData
.contentLength
= topic
.content
.length
;
568 logInfoData
.contentHash
= topic
.contentHash
;
570 const updateConfig
= {
572 url: subscription
.callback
,
575 [Enum
.Header
.Link
]: `<${topic.url}>; rel="self"${this.linkHub}`,
576 [Enum
.Header
.ContentType
]: topic
.contentType
|| Enum
.ContentType
.TextPlain
,
577 ...(subscription
.secret
&& { [Enum
.Header
.XHubSignature
]: Communication
.signature(topic
.content
, subscription
.secret
, subscription
.signatureAlgorithm
) }),
579 responseType: 'text',
582 this.logger
.info(_scope
, 'update request', logInfoData
);
586 response
= await
this.got(updateConfig
);
588 this.logger
.error(_scope
, 'update request failed', { ...logInfoData
, error: e
});
589 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
592 logInfoData
.response
= common
.gotResponseLogData(response
);
593 this.logger
.debug(_scope
, 'update response', logInfoData
);
595 switch (common
.httpStatusCodeClass(response
.statusCode
)) {
597 // Fall out of switch on success.
601 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
602 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
606 if (response
.statusCode
=== 410) { // GONE
607 this.logger
.info(_scope
, 'client declined further updates', logInfoData
);
608 await
this.db
.subscriptionDeliveryGone(dbCtx
, subscription
.callback
, subscription
.topicId
);
611 // All other 4xx falls through as failure
614 this.logger
.info(_scope
, 'update failed with non-2xx status code', logInfoData
);
615 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
619 await
this.db
.subscriptionDeliveryComplete(dbCtx
, subscription
.callback
, subscription
.topicId
, topic
.contentUpdated
);
620 this.logger
.info(_scope
, 'update success', logInfoData
);
625 * Claim and work a specific topic fetch task.
628 * @param {String} requestId
630 async
topicFetchClaimAndProcessById(dbCtx
, topicId
, requestId
) {
631 const _scope
= _fileScope('topicFetchClaimAndProcessById');
633 const claimResult
= await
this.db
.topicFetchClaimById(dbCtx
, topicId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
634 if (claimResult
.changes
!= 1) {
635 this.logger
.debug(_scope
, 'did not claim topic fetch', { topicId
, requestId
});
638 await
this.topicFetchProcess(dbCtx
, topicId
, requestId
);
643 * Claim and work a specific verification confirmation task.
645 * @param {*} verificationId
646 * @param {String} requestId
649 async
verificationClaimAndProcessById(dbCtx
, verificationId
, requestId
) {
650 const _scope
= _fileScope('verificationClaimAndProcessById');
652 const claimResult
= await
this.db
.verificationClaimById(dbCtx
, verificationId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
653 if (claimResult
.changes
!= 1) {
654 this.logger
.debug(_scope
, 'did not claim verification', { verificationId
, requestId
});
657 await
this.verificationProcess(dbCtx
, verificationId
, requestId
);
664 * @param {Number} wanted maximum tasks to claim
665 * @returns {Promise<void>[]}
667 async
workFeed(dbCtx
, wanted
) {
668 const _scope
= _fileScope('workFeed');
669 const inProgress
= [];
670 const requestId
= common
.requestId();
671 const claimTimeoutSeconds
= this.options
.communication
.claimTimeoutSeconds
;
672 const nodeId
= this.options
.nodeId
;
673 let topicFetchPromises
= [], verificationPromises
= [], updatePromises
= [];
675 this.logger
.debug(_scope
, 'called', { wanted
});
679 // Update topics before anything else.
680 const topicFetchIds
= await
this.db
.topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
681 topicFetchPromises
= topicFetchIds
.map((id
) => this.db
.context((ctx
) => this.topicFetchProcess(ctx
, id
, requestId
)));
682 inProgress
.push(...topicFetchPromises
);
683 wanted
-= topicFetchPromises
.length
;
687 // Then any pending verifications.
688 const verifications
= await
this.db
.verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
689 verificationPromises
= verifications
.map((id
) => this.db
.context((ctx
) => this.verificationProcess(ctx
, id
, requestId
)));
690 inProgress
.push(...verificationPromises
);
691 wanted
-= verificationPromises
.length
;
695 // Finally dole out content.
696 const updates
= await
this.db
.subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
697 updatePromises
= updates
.map((id
) => this.db
.context((ctx
) => this.subscriptionDeliveryProcess(ctx
, id
, requestId
)));
698 inProgress
.push(...updatePromises
);
699 wanted
-= updatePromises
.length
;
702 this.logger
.error(_scope
, 'failed', { error: e
});
703 // do not re-throw, return what we've claimed so far
705 this.logger
.debug(_scope
, 'searched for work', { topics: topicFetchPromises
.length
, verifications: verificationPromises
.length
, updates: updatePromises
.length
, wantedRemaining: wanted
, requestId
});
713 module
.exports
= Communication
;