4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
8 const axios
= require('axios');
9 const common
= require('./common');
10 const crypto
= require('crypto');
11 const Enum
= require('./enum');
12 const Errors
= require('./errors');
13 const Worker
= require('./worker');
14 const LinkHelper
= require('./link-helper');
15 const { version: packageVersion
, name: packageName
} = require('../package.json'); // For default UA string
17 const { performance
} = require('perf_hooks');
19 const _fileScope
= common
.fileScope(__filename
);
22 constructor(logger
, db
, options
) {
25 this.options
= options
;
26 this.linkHelper
= new LinkHelper(logger
, options
);
28 if (this.options
.dingus
.selfBaseUrl
) {
29 this.linkHub
= `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
32 this.logger
.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
36 this.axios
= axios
.create({
37 validateStatus: null, // Non-success responses are not exceptional
39 [Enum
.Header
.UserAgent
]: Communication
.userAgentString(options
.userAgent
),
43 this.axios
.interceptors
.request
.use((request
) => {
44 request
.startTimestampMs
= performance
.now();
47 this.axios
.interceptors
.response
.use((response
) => {
48 response
.elapsedTimeMs
= performance
.now() - response
.config
.startTimestampMs
;
52 this.worker
= new Worker(logger
, db
, this.workFeed
.bind(this), options
);
57 static userAgentString(userAgentConfig
) {
58 // eslint-disable-next-line security/detect-object-injection
59 const _conf
= (field
, def
) => (userAgentConfig
&& field
in userAgentConfig
) ? userAgentConfig
[field
] : def
;
60 const product
= _conf('product', packageName
).split('/').pop();
61 const version
= _conf('version', packageVersion
);
62 let implementation
= _conf('implementation', Enum
.Specification
);
64 implementation
= ` (${implementation})`;
66 return `${product}/${version}${implementation}`;
71 * Generate a random string.
72 * @param {Integer} bytes
75 static async
generateChallenge(bytes
= 30) {
76 return (await common
.randomBytesAsync(bytes
)).toString('base64');
81 * Generate the signature string for content.
82 * @param {Buffer} message
83 * @param {Buffer} secret
84 * @param {String} algorithm
87 static signature(message
, secret
, algorithm
) {
88 const hmac
= crypto
.createHmac(algorithm
, secret
);
90 return `${algorithm}=${hmac.digest('hex')}`;
95 * Generate the hash for content.
96 * @param {Buffer} content
97 * @param {String} algorithm
100 static contentHash(content
, algorithm
) {
101 const hash
= crypto
.createHash(algorithm
);
102 hash
.update(content
);
103 return hash
.digest('hex');
108 * A request skeleton config.
109 * @param {String} method
110 * @param {String} requestUrl
111 * @param {String} body
112 * @param {Object} params
114 static _axiosConfig(method
, requestUrl
, body
, params
= {}, headers
= {}) {
115 const urlObj
= new URL(requestUrl
);
118 url: `${urlObj.origin}${urlObj.pathname}`,
119 params: urlObj
.searchParams
,
121 ...(body
&& { data: body
}),
122 // Setting this does not appear to be enough to keep axios from parsing JSON response into object
123 responseType: 'text',
124 // So force the matter by eliding all response transformations
125 transformResponse: [ (res
) => res
],
127 Object
.entries(params
).map(([k
, v
]) => config
.params
.set(k
, v
));
133 * Create request config for verifying an intent.
134 * @param {URL} requestUrl
135 * @param {String} topicUrl
136 * @param {String} mode
137 * @param {Integer} leaseSeconds
138 * @param {String} challenge
140 static _intentVerifyAxiosConfig(requestUrl
, topicUrl
, mode
, leaseSeconds
, challenge
) {
141 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
142 leaseSeconds
= leaseSeconds
.toString();
144 return Communication
._axiosConfig('GET', requestUrl
, undefined, {
146 'hub.topic': topicUrl
,
147 'hub.challenge': challenge
,
148 'hub.lease_seconds': leaseSeconds
,
154 * Create request config for denying an intent.
155 * @param {String} requestUrl
156 * @param {String} topicUrl
157 * @param {String} reason
160 static _intentDenyAxiosConfig(requestUrl
, topicUrl
, reason
) {
161 return Communication
._axiosConfig('GET', requestUrl
, undefined, {
162 'hub.mode': Enum
.Mode
.Denied
,
163 'hub.topic': topicUrl
,
164 ...(reason
&& { 'hub.reason': reason
}),
170 * Create request config for querying publisher for subscription validation.
171 * @param {Topic} topic
172 * @param {Verification} verification
175 static _publisherValidationAxiosConfig(topic
, verification
) {
177 callback: verification
.callback
,
179 ...(verification
.httpFrom
&& { from: verification
.httpFrom
}),
180 ...(verification
.httpRemoteAddr
&& { address: verification
.httpRemoteAddr
}),
182 return Communication
._axiosConfig('POST', topic
.publisherValidationUrl
, body
, {}, {
183 [Enum
.Header
.ContentType
]: Enum
.ContentType
.ApplicationJson
,
189 * Create request config for fetching topic content.
190 * Prefer existing content-type, but accept anything.
191 * @param {Topic} topic
194 static _topicFetchAxiosConfig(topic
) {
195 const acceptWildcard
= '*/*' + (topic
.contentType
? ';q=0.9' : '');
196 const acceptPreferred
= [topic
.contentType
, acceptWildcard
].filter((x
) => x
).join(', ');
197 return Communication
._axiosConfig('GET', topic
.url
, undefined, {}, {
198 [Enum
.Header
.Accept
]: acceptPreferred
,
199 ...(topic
.httpEtag
&& { [Enum
.Header
.IfNoneMatch
]: topic
.httpEtag
}),
200 ...(topic
.httpLastModified
&& { [Enum
.Header
.IfModifiedSince
]: topic
.httpLastModified
}),
206 * Attempt to verify a requested intent with client callback endpoint.
208 * @param {*} verificationId
209 * @param {String} requestId
210 * @returns {Boolean} whether to subsequently attempt next task if verification succeeds
212 async
verificationProcess(dbCtx
, verificationId
, requestId
) {
213 const _scope
= _fileScope('verificationProcess');
215 const verification
= await
this.db
.verificationGetById(dbCtx
, verificationId
);
217 this.logger
.error(_scope
, 'no such verification', { verificationId
, requestId
});
218 throw new Errors
.InternalInconsistencyError('no such verification id');
221 const topic
= await
this.db
.topicGetById(dbCtx
, verification
.topicId
);
223 this.logger
.error(_scope
, Enum
.Message
.NoSuchTopicId
, { verification
, requestId
});
224 throw new Errors
.InternalInconsistencyError(Enum
.Message
.NoSuchTopicId
);
227 if (!topic
.isActive
) {
228 // These should be filtered out when selecting verification tasks to process.
229 this.logger
.debug(_scope
, 'topic not active, skipping verification', { verification
, requestId
});
230 await
this.db
.verificationRelease(dbCtx
, verificationId
);
234 // If topic is deleted, deny any subscriptions.
235 // Un-subscriptions can continue to be verified.
236 if (topic
.isDeleted
&& verification
.mode
=== Enum
.Mode
.Subscribe
) {
237 this.logger
.info(_scope
, 'topic is deleted, verification becomes denial', { verification
, requestId
});
239 verification
.mode
= Enum
.Mode
.Denied
;
240 verification
.reason
= 'Gone: topic no longer valid on this hub.';
241 verification
.isPublisherValidated
= true;
242 await
this.db
.verificationUpdate(dbCtx
, verification
);
245 // If verification needs publisher validation, this delivery is for publisher.
246 if (verification
.mode
=== Enum
.Mode
.Subscribe
&& verification
.isPublisherValidated
=== false) {
247 this.logger
.debug(_scope
, 'attempting publisher validation', { verification
, requestId
});
248 const continueVerification
= await
this.publisherValidate(dbCtx
, topic
, verification
);
250 // If publisher validation completed, verification will proceed.
251 // If not, we're done for now and shall try again later.
252 if (!continueVerification
) {
253 this.logger
.debug(_scope
, 'publisher validation did not complete, belaying verification', { verification
});
254 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
259 const u
= new URL(verification
.callback
);
260 let callbackRequestConfig
, challenge
;
261 if (verification
.mode
=== Enum
.Mode
.Denied
) {
262 // Denials don't have a challenge.
263 callbackRequestConfig
= Communication
._intentDenyAxiosConfig(u
, topic
.url
, verification
.reason
);
265 // Subscriptions and unsubscriptions require challenge matching.
266 challenge
= await Communication
.generateChallenge();
267 callbackRequestConfig
= Communication
._intentVerifyAxiosConfig(u
, topic
.url
, verification
.mode
, verification
.leaseSeconds
, challenge
);
270 const logInfoData
= {
273 mode: verification
.mode
,
274 originalRequestId: verification
.requestId
,
279 this.logger
.info(_scope
, 'verification request', logInfoData
);
283 response
= await
this.axios(callbackRequestConfig
);
285 this.logger
.error(_scope
, 'verification request failed', { ...logInfoData
, error: e
});
286 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
289 logInfoData
.response
= common
.axiosResponseLogData(response
);
290 this.logger
.debug(_scope
, 'verification response', logInfoData
);
292 let verificationAccepted
= true; // Presume success.
294 switch (common
.httpStatusCodeClass(response
.status
)) {
296 // Success, fall out of switch.
301 this.logger
.info(_scope
, 'verification remote server error', logInfoData
);
302 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
306 // Anything else is unsuccessful.
307 this.logger
.info(_scope
, 'verification rejected by status', logInfoData
);
308 verificationAccepted
= false;
311 // Any denial is not accepted.
312 if (verification
.mode
=== Enum
.Mode
.Denied
) {
313 this.logger
.info(_scope
, 'verification denial accepted', logInfoData
);
314 verificationAccepted
= false;
317 if ([Enum
.Mode
.Subscribe
, Enum
.Mode
.Unsubscribe
].includes(verification
.mode
)
318 && response
.data
!== challenge
) {
319 this.logger
.info(_scope
, 'verification rejected by challenge', logInfoData
);
320 verificationAccepted
= false;
323 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
324 switch (verification
.mode
) {
325 case Enum
.Mode
.Subscribe:
326 if (verificationAccepted
) {
327 await
this.db
.subscriptionUpsert(txCtx
, verification
);
331 case Enum
.Mode
.Unsubscribe:
332 if (verificationAccepted
) {
333 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
334 if (topic
.isDeleted
) {
335 // Remove a deleted topic after the last subscription is notified.
336 await
this.db
.topicPendingDelete(txCtx
, topic
.id
);
341 case Enum
.Mode
.Denied:
342 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
343 if (topic
.isDeleted
) {
344 // Remove a deleted topic after he last subscription is notified.
345 await
this.db
.topicPendingDelete(txCtx
, topic
.id
);
350 this.logger
.error(_scope
, 'unanticipated mode', { logInfoData
});
351 throw new Errors
.InternalInconsistencyError(verification
.mode
);
354 await
this.db
.verificationComplete(txCtx
, verificationId
, verification
.callback
, verification
.topicId
);
357 this.logger
.info(_scope
, 'verification complete', { ...logInfoData
, verificationAccepted
});
362 * Attempt to verify a pending subscription request with publisher.
363 * Updates (and persists) verification.
364 * Returns boolean of status of publisher contact, and hence
365 * whether to continue verification with client.
367 * @param {TopicData} topic
368 * @param {VerificationData} verification
371 async
publisherValidate(dbCtx
, topic
, verification
) {
372 const _scope
= _fileScope('publisherValidate');
373 const publisherValidationRequestConfig
= Communication
._publisherValidationAxiosConfig(topic
, verification
);
374 const logInfoData
= {
376 callbackUrl: verification
.callback
,
377 requestId: verification
.requestId
,
381 this.logger
.info(_scope
, 'publisher validation request', logInfoData
);
384 response
= await
this.axios(publisherValidationRequestConfig
);
386 this.logger
.error(_scope
, 'publisher validation failed', { ...logInfoData
, error: e
});
387 return false; // Do not continue with client verification.
390 logInfoData
.response
= common
.axiosResponseLogData(response
);
391 this.logger
.debug(_scope
, 'validation response', logInfoData
);
393 let verificationNeedsUpdate
= false;
394 switch (common
.httpStatusCodeClass(response
.status
)) {
396 this.logger
.info(_scope
, 'publisher validation complete, allowed', logInfoData
);
400 this.logger
.info(_scope
, 'publisher validation remote server error', logInfoData
);
401 return false; // Do not continue with client verification.
404 this.logger
.info(_scope
, 'publisher validation complete, denied', logInfoData
);
405 // Change client verification
406 verification
.mode
= Enum
.Mode
.Denied
;
407 verification
.reason
= 'publisher rejected request'; // TODO: details from response?
408 verificationNeedsUpdate
= true;
411 // Success from publisher, either accepted or denied.
412 // Set validated flag, and allow client verification to continue.
413 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
414 if (verificationNeedsUpdate
) {
415 await
this.db
.verificationUpdate(txCtx
, verification
.id
, verification
);
417 await
this.db
.verificationValidated(txCtx
, verification
.id
);
424 * Retrieve content from a topic.
427 * @param {String} requestId
430 async
topicFetchProcess(dbCtx
, topicId
, requestId
) {
431 const _scope
= _fileScope('topicFetchProcess');
432 const logInfoData
= {
437 this.logger
.debug(_scope
, 'called', logInfoData
);
439 const topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
440 if (topic
=== undefined) {
441 this.logger
.error(_scope
, Enum
.Message
.NoSuchTopicId
, logInfoData
);
442 throw new Errors
.InternalInconsistencyError(Enum
.Message
.NoSuchTopicId
);
445 // Cull any expired subscriptions
446 await
this.db
.subscriptionDeleteExpired(dbCtx
, topicId
);
448 logInfoData
.url
= topic
.url
;
450 if (topic
.isDeleted
) {
451 this.logger
.debug(_scope
, 'topic deleted, skipping update request', logInfoData
);
455 const updateRequestConfig
= Communication
._topicFetchAxiosConfig(topic
);
457 this.logger
.info(_scope
, 'topic update request', logInfoData
);
461 response
= await
this.axios(updateRequestConfig
);
463 this.logger
.error(_scope
, 'update request failed', logInfoData
);
464 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
467 logInfoData
.response
= common
.axiosResponseLogData(response
);
468 this.logger
.debug(_scope
, 'fetch response', logInfoData
);
470 switch (common
.httpStatusCodeClass(response
.status
)) {
473 // Fall out of switch on success
477 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
478 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
482 this.logger
.info(_scope
, 'fetch failed by status', logInfoData
);
483 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
487 if (response
.status
=== 304) {
488 this.logger
.info(_scope
, 'content has not changed, per server', logInfoData
);
489 await
this.db
.topicFetchComplete(dbCtx
, topicId
);
493 const contentHash
= Communication
.contentHash(response
.data
, topic
.contentHashAlgorithm
);
494 logInfoData
.contentHash
= contentHash
;
495 if (topic
.contentHash
=== contentHash
) {
496 this.logger
.info(_scope
, 'content has not changed', logInfoData
);
497 await
this.db
.topicFetchComplete(dbCtx
, topicId
);
501 const validHub
= await
this.linkHelper
.validHub(topic
.url
, response
.headers
, response
.data
);
503 this.logger
.info(_scope
, 'retrieved topic does not list us as hub', { logInfoData
});
504 if (this.options
.communication
.strictTopicHubLink
) {
505 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
506 // Set as deleted and set content_updated so subscriptions are notified.
507 await
this.db
.topicDeleted(txCtx
, topicId
);
508 await
this.db
.topicFetchComplete(txCtx
, topicId
);
510 // Attempt to remove from db, if no active subscriptions.
511 await
this.db
.topicPendingDelete(dbCtx
, topicId
);
516 const contentType
= response
.headers
[Enum
.Header
.ContentType
.toLowerCase()];
517 const httpETag
= response
.headers
[Enum
.Header
.ETag
.toLowerCase()];
518 const httpLastModified
= response
.headers
[Enum
.Header
.LastModified
.toLowerCase()];
520 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
521 await
this.db
.topicSetContent(txCtx
, {
523 content: Buffer
.from(response
.data
),
525 ...(contentType
&& { contentType
}),
526 ...(httpETag
&& { httpETag
}),
527 ...(httpLastModified
&& { httpLastModified
}),
530 await
this.db
.topicFetchComplete(txCtx
, topicId
);
532 this.logger
.info(_scope
, 'content updated', logInfoData
);
537 * Attempt to deliver a topic's content to a subscription.
539 * @param {String} callback
541 * @param {String} requestId
543 async
subscriptionDeliveryProcess(dbCtx
, subscriptionId
, requestId
) {
544 const _scope
= _fileScope('subscriptionDeliveryProcess');
546 const logInfoData
= {
551 this.logger
.debug(_scope
, 'called', logInfoData
);
553 const subscription
= await
this.db
.subscriptionGetById(dbCtx
, subscriptionId
);
555 this.logger
.error(_scope
, 'no such subscription', logInfoData
);
556 throw new Errors
.InternalInconsistencyError('no such subscription');
559 logInfoData
.callback
= subscription
.callback
;
561 const topic
= await
this.db
.topicGetContentById(dbCtx
, subscription
.topicId
);
563 this.logger
.error(_scope
, 'no such topic', logInfoData
);
564 throw new Errors
.InternalInconsistencyError('no such topic');
567 if (topic
.isDeleted
) {
568 // If a topic has been set deleted, it does not list us as a valid hub.
569 // Queue an unsubscription.
570 const verification
= {
571 topicId: subscription
.topicId
,
572 callback: subscription
.callback
,
573 mode: Enum
.Mode
.Denied
,
574 reason: 'Gone: topic no longer valid on this hub.',
575 isPublisherValidated: true,
579 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
580 await
this.db
.verificationInsert(txCtx
, verification
);
581 await
this.db
.subscriptionDeliveryComplete(txCtx
, subscription
.callback
, subscription
.topicId
, topic
.contentUpdated
);
583 this.logger
.info(_scope
, 'update unsubscription for deleted topic', logInfoData
);
587 logInfoData
.contentLength
= topic
.content
.length
;
588 logInfoData
.contentHash
= topic
.contentHash
;
590 const updateAxiosConfig
= Communication
._axiosConfig('POST', subscription
.callback
, topic
.content
, {}, {
591 [Enum
.Header
.Link
]: `<${topic.url}>; rel="self"${this.linkHub}`,
592 [Enum
.Header
.ContentType
]: topic
.contentType
|| Enum
.ContentType
.TextPlain
,
593 ...(subscription
.secret
&& { [Enum
.Header
.XHubSignature
]: Communication
.signature(topic
.content
, subscription
.secret
, subscription
.signatureAlgorithm
) }),
596 this.logger
.info(_scope
, 'update request', logInfoData
);
600 response
= await
this.axios(updateAxiosConfig
);
602 this.logger
.error(_scope
, 'update request failed', { ...logInfoData
, error: e
});
603 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
606 logInfoData
.response
= common
.axiosResponseLogData(response
);
607 this.logger
.debug(_scope
, 'update response', logInfoData
);
609 switch (common
.httpStatusCodeClass(response
.status
)) {
611 // Fall out of switch on success.
615 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
616 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
620 if (response
.status
=== 410) { // GONE
621 this.logger
.info(_scope
, 'client declined further updates', logInfoData
);
622 await
this.db
.subscriptionDeliveryGone(dbCtx
, subscription
.callback
, subscription
.topicId
);
625 // All other 4xx falls through as failure
628 this.logger
.info(_scope
, 'update failed with non-2xx status code', logInfoData
);
629 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
633 await
this.db
.subscriptionDeliveryComplete(dbCtx
, subscription
.callback
, subscription
.topicId
, topic
.contentUpdated
);
634 this.logger
.info(_scope
, 'update success', logInfoData
);
639 * Claim and work a specific topic fetch task.
642 * @param {String} requestId
644 async
topicFetchClaimAndProcessById(dbCtx
, topicId
, requestId
) {
645 const _scope
= _fileScope('topicFetchClaimAndProcessById');
647 const claimResult
= await
this.db
.topicFetchClaimById(dbCtx
, topicId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
648 if (claimResult
.changes
!= 1) {
649 this.logger
.debug(_scope
, 'did not claim topic fetch', { topicId
, requestId
});
652 await
this.topicFetchProcess(dbCtx
, topicId
, requestId
);
657 * Claim and work a specific verification confirmation task.
659 * @param {*} verificationId
660 * @param {String} requestId
663 async
verificationClaimAndProcessById(dbCtx
, verificationId
, requestId
) {
664 const _scope
= _fileScope('verificationClaimAndProcessById');
666 const claimResult
= await
this.db
.verificationClaimById(dbCtx
, verificationId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
667 if (claimResult
.changes
!= 1) {
668 this.logger
.debug(_scope
, 'did not claim verification', { verificationId
, requestId
});
671 await
this.verificationProcess(dbCtx
, verificationId
, requestId
);
678 * @param {Number} wanted maximum tasks to claim
679 * @returns {Promise<void>[]}
681 async
workFeed(dbCtx
, wanted
) {
682 const _scope
= _fileScope('workFeed');
683 const inProgress
= [];
684 const requestId
= common
.requestId();
685 const claimTimeoutSeconds
= this.options
.communication
.claimTimeoutSeconds
;
686 const nodeId
= this.options
.nodeId
;
687 let topicFetchPromises
= [], verificationPromises
= [], updatePromises
= [];
689 this.logger
.debug(_scope
, 'called', { wanted
});
693 // Update topics before anything else.
694 const topicFetchIds
= await
this.db
.topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
695 topicFetchPromises
= topicFetchIds
.map((id
) => this.db
.context((ctx
) => this.topicFetchProcess(ctx
, id
, requestId
)));
696 inProgress
.push(...topicFetchPromises
);
697 wanted
-= topicFetchPromises
.length
;
701 // Then any pending verifications.
702 const verifications
= await
this.db
.verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
703 verificationPromises
= verifications
.map((id
) => this.db
.context((ctx
) => this.verificationProcess(ctx
, id
, requestId
)));
704 inProgress
.push(...verificationPromises
);
705 wanted
-= verificationPromises
.length
;
709 // Finally dole out content.
710 const updates
= await
this.db
.subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
711 updatePromises
= updates
.map((id
) => this.db
.context((ctx
) => this.subscriptionDeliveryProcess(ctx
, id
, requestId
)));
712 inProgress
.push(...updatePromises
);
713 wanted
-= updatePromises
.length
;
716 this.logger
.error(_scope
, 'failed', { error: e
});
717 // do not re-throw, return what we've claimed so far
719 this.logger
.debug(_scope
, 'searched for work', { topics: topicFetchPromises
.length
, verifications: verificationPromises
.length
, updates: updatePromises
.length
, wantedRemaining: wanted
, requestId
});
727 module
.exports
= Communication
;