4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
8 const common
= require('./common');
9 const crypto
= require('crypto');
10 const Enum
= require('./enum');
11 const Errors
= require('./errors');
12 const Worker
= require('./worker');
13 const LinkHelper
= require('./link-helper');
14 const { version: packageVersion
, name: packageName
} = require('../package.json'); // For default UA string
16 const _fileScope
= common
.fileScope(__filename
);
19 constructor(logger
, db
, options
) {
22 this.options
= options
;
23 this.linkHelper
= new LinkHelper(logger
, options
);
25 if (this.options
.dingus
.selfBaseUrl
) {
26 this.linkHub
= `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
29 this.logger
.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
32 this.Got
= undefined; // Will become the async imported got.
33 this.got
= this._init
; // First invocation imports got and replaces this.
35 this.worker
= new Worker(logger
, db
, this.workFeed
.bind(this), options
);
41 * Do a little dance to cope with ESM dynamic import.
42 * @param {...any} args arguments
43 * @returns {Promise<any>} got response
45 async
_init(...args
) {
47 // For some reason eslint is confused about import being supported here.
49 this.Got
= await
import('got');
50 this.got
= this.Got
.got
.extend({
51 followRedirect: false, // Outgoing API calls should not encounter redirects
52 throwHttpErrors: false, // We will be checking status codes explicitly
54 [Enum
.Header
.UserAgent
]: Communication
.userAgentString(this.options
.userAgent
),
57 request: this.options
.communication
.requestTimeoutMs
|| 120000,
67 /* istanbul ignore if */
69 /* istanbul ignore next */
70 return this.got(...args
);
76 * Take note of transient retries.
77 * @param {*} error error
78 * @param {*} retryCount retry count
80 _onRetry(error
, retryCount
) {
81 const _scope
= _fileScope('_onRetry');
82 this.logger
.debug(_scope
, 'retry', { retryCount
, error
});
87 * Construct a user-agent value.
88 * @param {object} userAgentConfig user agent config
89 * @param {string=} userAgentConfig.product product name (default package name)
90 * @param {string=} userAgentConfig.version version (default package version)
91 * @param {string=} userAgentConfig.implementation implementation (default spec supported)
92 * @returns {string} user agent string 'product/version (implementation)'
94 static userAgentString(userAgentConfig
) {
95 // eslint-disable-next-line security/detect-object-injection
96 const _conf
= (field
, def
) => (userAgentConfig
&& field
in userAgentConfig
) ? userAgentConfig
[field
] : def
;
97 const product
= _conf('product', packageName
).split('/').pop();
98 const version
= _conf('version', packageVersion
);
99 let implementation
= _conf('implementation', Enum
.Specification
);
100 if (implementation
) {
101 implementation
= ` (${implementation})`;
103 return `${product}/${version}${implementation}`;
108 * @alias {number} Integer
111 * Generate a random string.
112 * @param {Integer} bytes size of challenge
113 * @returns {Promise<string>} base64 randomness
115 static async
generateChallenge(bytes
= 30) {
116 return (await common
.randomBytesAsync(bytes
)).toString('base64');
121 * Generate the signature string for content.
122 * @param {Buffer} message message to sign
123 * @param {Buffer} secret secret to sign with
124 * @param {string} algorithm algorithm to sign with
125 * @returns {string} signature string
127 static signature(message
, secret
, algorithm
) {
128 const hmac
= crypto
.createHmac(algorithm
, secret
)
131 return `${algorithm}=${hmac}`;
136 * Generate the hash for content.
137 * @param {Buffer} content content
138 * @param {string} algorithm algorithm
139 * @returns {string} hash of content
141 static contentHash(content
, algorithm
) {
142 return crypto
.createHash(algorithm
)
149 * Attempt to verify a requested intent with client callback endpoint.
150 * @param {*} dbCtx db context
151 * @param {*} verificationId verification id
152 * @param {string} requestId request id
153 * @returns {Promise<boolean>} whether to subsequently attempt next task if verification succeeds
155 async
verificationProcess(dbCtx
, verificationId
, requestId
) {
156 const _scope
= _fileScope('verificationProcess');
158 const verification
= await
this.db
.verificationGetById(dbCtx
, verificationId
);
160 this.logger
.error(_scope
, 'no such verification', { verificationId
, requestId
});
161 throw new Errors
.InternalInconsistencyError('no such verification id');
164 const topic
= await
this.db
.topicGetById(dbCtx
, verification
.topicId
);
166 this.logger
.error(_scope
, Enum
.Message
.NoSuchTopicId
, { verification
, requestId
});
167 throw new Errors
.InternalInconsistencyError(Enum
.Message
.NoSuchTopicId
);
170 if (!topic
.isActive
) {
171 // These should be filtered out when selecting verification tasks to process.
172 this.logger
.debug(_scope
, 'topic not active, skipping verification', { verification
, requestId
});
173 await
this.db
.verificationRelease(dbCtx
, verificationId
);
177 // If topic is deleted, deny any subscriptions.
178 // Un-subscriptions can continue to be verified.
179 if (topic
.isDeleted
&& verification
.mode
=== Enum
.Mode
.Subscribe
) {
180 this.logger
.info(_scope
, 'topic is deleted, verification becomes denial', { verification
, requestId
});
182 verification
.mode
= Enum
.Mode
.Denied
;
183 verification
.reason
= 'Gone: topic no longer valid on this hub.';
184 verification
.isPublisherValidated
= true;
185 await
this.db
.verificationUpdate(dbCtx
, verification
);
188 // If verification needs publisher validation, this delivery is for publisher.
189 if (verification
.mode
=== Enum
.Mode
.Subscribe
&& verification
.isPublisherValidated
=== false) {
190 this.logger
.debug(_scope
, 'attempting publisher validation', { verification
, requestId
});
191 const continueVerification
= await
this.publisherValidate(dbCtx
, topic
, verification
);
193 // If publisher validation completed, verification will proceed.
194 // If not, we're done for now and shall try again later.
195 if (!continueVerification
) {
196 this.logger
.debug(_scope
, 'publisher validation did not complete, belaying verification', { verification
});
197 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
202 const callbackRequestConfig
= {
204 url: new URL(verification
.callback
),
205 responseType: 'text',
207 const callbackParams
= {
208 'hub.topic': topic
.url
,
209 'hub.mode': verification
.mode
,
213 if (verification
.mode
=== Enum
.Mode
.Denied
) {
214 // Denials don't have a challenge, but might have a reason.
215 if (verification
.reason
) {
216 callbackParams
['hub.reason'] = verification
.reason
;
219 // Subscriptions and unsubscriptions require challenge matching.
220 challenge
= await Communication
.generateChallenge();
221 Object
.assign(callbackParams
, {
222 'hub.challenge': challenge
,
223 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
224 'hub.lease_seconds': verification
.leaseSeconds
.toString(),
227 Object
.entries(callbackParams
)
228 .forEach(([k
, v
]) => callbackRequestConfig
.url
.searchParams
.set(k
, v
))
231 const logInfoData
= {
232 callbackUrl: callbackRequestConfig
.url
.href
,
234 mode: verification
.mode
,
235 originalRequestId: verification
.requestId
,
240 this.logger
.info(_scope
, 'verification request', logInfoData
);
244 response
= await
this.got(callbackRequestConfig
);
246 this.logger
.error(_scope
, 'verification request failed', { ...logInfoData
, error: e
});
247 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
250 logInfoData
.response
= common
.gotResponseLogData(response
);
251 this.logger
.debug(_scope
, 'verification response', logInfoData
);
253 let verificationAccepted
= true; // Presume success.
255 switch (common
.httpStatusCodeClass(response
.statusCode
)) {
257 // Success, fall out of switch.
262 this.logger
.info(_scope
, 'verification remote server error', logInfoData
);
263 await
this.db
.verificationIncomplete(dbCtx
, verificationId
, this.options
.communication
.retryBackoffSeconds
);
267 // Anything else is unsuccessful.
268 this.logger
.info(_scope
, 'verification rejected by status', logInfoData
);
269 verificationAccepted
= false;
272 // Any denial is not accepted.
273 if (verification
.mode
=== Enum
.Mode
.Denied
) {
274 this.logger
.info(_scope
, 'verification denial accepted', logInfoData
);
275 verificationAccepted
= false;
278 if ([Enum
.Mode
.Subscribe
, Enum
.Mode
.Unsubscribe
].includes(verification
.mode
)
279 && response
.body
!== challenge
) {
280 this.logger
.info(_scope
, 'verification rejected by challenge', logInfoData
);
281 verificationAccepted
= false;
284 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
285 switch (verification
.mode
) {
286 case Enum
.Mode
.Subscribe:
287 if (verificationAccepted
) {
288 await
this.db
.subscriptionUpsert(txCtx
, verification
);
292 case Enum
.Mode
.Unsubscribe:
293 if (verificationAccepted
) {
294 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
295 if (topic
.isDeleted
) {
296 // Remove a deleted topic after the last subscription is notified.
297 await
this.db
.topicPendingDelete(txCtx
, topic
.id
);
302 case Enum
.Mode
.Denied:
303 await
this.db
.subscriptionDelete(txCtx
, verification
.callback
, verification
.topicId
);
304 if (topic
.isDeleted
) {
305 // Remove a deleted topic after the last subscription is notified.
306 await
this.db
.topicPendingDelete(txCtx
, topic
.id
);
311 this.logger
.error(_scope
, 'unanticipated mode', { logInfoData
});
312 throw new Errors
.InternalInconsistencyError(verification
.mode
);
315 await
this.db
.verificationComplete(txCtx
, verificationId
, verification
.callback
, verification
.topicId
);
318 this.logger
.info(_scope
, 'verification complete', { ...logInfoData
, verificationAccepted
});
323 * @alias {object} TopicData
324 * @alias {object} VerificationData
327 * Attempt to verify a pending subscription request with publisher.
328 * Updates (and persists) verification.
329 * Returns boolean of status of publisher contact, and hence
330 * whether to continue verification with client.
332 * This is not defined by the spec. We opt to speak JSON here.
333 * @param {*} dbCtx db context
334 * @param {TopicData} topic topic
335 * @param {VerificationData} verification verification
336 * @returns {Promise<boolean>} true if successful contact with publisher
338 async
publisherValidate(dbCtx
, topic
, verification
) {
339 const _scope
= _fileScope('publisherValidate');
340 const logInfoData
= {
342 callbackUrl: verification
.callback
,
343 requestId: verification
.requestId
,
347 this.logger
.info(_scope
, 'publisher validation request', logInfoData
);
349 const publisherValidationRequestConfig
= {
351 url: topic
.publisherValidationUrl
,
353 callback: verification
.callback
,
355 ...(verification
.httpFrom
&& { from: verification
.httpFrom
}),
356 ...(verification
.httpRemoteAddr
&& { address: verification
.httpRemoteAddr
}),
358 responseType: 'json',
361 response
= await
this.got(publisherValidationRequestConfig
);
363 this.logger
.error(_scope
, 'publisher validation failed', { ...logInfoData
, error: e
});
364 return false; // Do not continue with client verification.
367 logInfoData
.response
= common
.gotResponseLogData(response
);
368 this.logger
.debug(_scope
, 'validation response', logInfoData
);
370 let verificationNeedsUpdate
= false;
371 switch (common
.httpStatusCodeClass(response
.statusCode
)) {
373 this.logger
.info(_scope
, 'publisher validation complete, allowed', logInfoData
);
377 this.logger
.info(_scope
, 'publisher validation remote server error', logInfoData
);
378 return false; // Do not continue with client verification.
381 this.logger
.info(_scope
, 'publisher validation complete, denied', logInfoData
);
382 // Change client verification
383 verification
.mode
= Enum
.Mode
.Denied
;
384 verification
.reason
= 'publisher rejected request'; // TODO: details from response?
385 verificationNeedsUpdate
= true;
388 // Success from publisher, either accepted or denied.
389 // Set validated flag, and allow client verification to continue.
390 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
391 if (verificationNeedsUpdate
) {
392 await
this.db
.verificationUpdate(txCtx
, verification
.id
, verification
);
394 await
this.db
.verificationValidated(txCtx
, verification
.id
);
401 * Retrieve content from a topic.
402 * @param {*} dbCtx db context
403 * @param {*} topicId topic id
404 * @param {string} requestId request id
405 * @returns {Promise<void>}
407 async
topicFetchProcess(dbCtx
, topicId
, requestId
) {
408 const _scope
= _fileScope('topicFetchProcess');
409 const logInfoData
= {
414 this.logger
.debug(_scope
, 'called', logInfoData
);
416 const topic
= await
this.db
.topicGetById(dbCtx
, topicId
);
417 if (topic
=== undefined) {
418 this.logger
.error(_scope
, Enum
.Message
.NoSuchTopicId
, logInfoData
);
419 throw new Errors
.InternalInconsistencyError(Enum
.Message
.NoSuchTopicId
);
422 // Cull any expired subscriptions
423 await
this.db
.subscriptionDeleteExpired(dbCtx
, topicId
);
425 logInfoData
.url
= topic
.url
;
427 if (topic
.isDeleted
) {
428 this.logger
.debug(_scope
, 'topic deleted, skipping update request', logInfoData
);
432 const updateRequestConfig
= {
433 followRedirect: true,
437 [Enum
.Header
.Accept
]: [topic
.contentType
, `*/*${topic.contentType ? ';q=0.9' : ''}`].filter((x
) => x
).join(', '),
438 ...(topic
.httpEtag
&& { [Enum
.Header
.IfNoneMatch
]: topic
.httpEtag
}),
439 ...(topic
.httpLastModified
&& { [Enum
.Header
.IfModifiedSince
]: topic
.httpLastModified
}),
441 responseType: 'buffer',
444 this.logger
.info(_scope
, 'topic update request', logInfoData
);
448 response
= await
this.got(updateRequestConfig
);
450 this.logger
.error(_scope
, 'update request failed', { ...logInfoData
, error: e
});
451 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
454 logInfoData
.response
= common
.gotResponseLogData(response
);
455 this.logger
.debug(_scope
, 'fetch response', logInfoData
);
457 switch (common
.httpStatusCodeClass(response
.statusCode
)) {
460 // Fall out of switch on success
464 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
465 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
469 this.logger
.info(_scope
, 'fetch failed by status', logInfoData
);
470 await
this.db
.topicFetchIncomplete(dbCtx
, topicId
, this.options
.communication
.retryBackoffSeconds
);
474 if (response
.statusCode
=== 304) {
475 this.logger
.info(_scope
, 'content has not changed, per server', logInfoData
);
476 await
this.db
.topicFetchComplete(dbCtx
, topicId
);
480 const contentHash
= Communication
.contentHash(response
.body
, topic
.contentHashAlgorithm
);
481 logInfoData
.contentHash
= contentHash
;
482 if (topic
.contentHash
=== contentHash
) {
483 this.logger
.info(_scope
, 'content has not changed', logInfoData
);
484 await
this.db
.topicFetchComplete(dbCtx
, topicId
);
488 const validHub
= await
this.linkHelper
.validHub(topic
.url
, response
.headers
, response
.body
);
490 this.logger
.info(_scope
, 'retrieved topic does not list us as hub', { logInfoData
});
491 if (this.options
.communication
.strictTopicHubLink
) {
492 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
493 // Set as deleted and set content_updated so subscriptions are notified.
494 await
this.db
.topicDeleted(txCtx
, topicId
);
495 await
this.db
.topicFetchComplete(txCtx
, topicId
);
497 // Attempt to remove from db, if no active subscriptions.
498 await
this.db
.topicPendingDelete(dbCtx
, topicId
);
503 const contentType
= response
.headers
[Enum
.Header
.ContentType
.toLowerCase()];
504 const httpETag
= response
.headers
[Enum
.Header
.ETag
.toLowerCase()];
505 const httpLastModified
= response
.headers
[Enum
.Header
.LastModified
.toLowerCase()];
507 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
508 await
this.db
.topicSetContent(txCtx
, {
510 content: Buffer
.from(response
.body
),
512 ...(contentType
&& { contentType
}),
513 ...(httpETag
&& { httpETag
}),
514 ...(httpLastModified
&& { httpLastModified
}),
517 await
this.db
.topicFetchComplete(txCtx
, topicId
);
519 this.logger
.info(_scope
, 'content updated', logInfoData
);
524 * Attempt to deliver a topic's content to a subscription.
525 * @param {*} dbCtx db context
526 * @param {string} subscriptionId subscription id
527 * @param {string} requestId request id
528 * @returns {Promise<void>}
530 async
subscriptionDeliveryProcess(dbCtx
, subscriptionId
, requestId
) {
531 const _scope
= _fileScope('subscriptionDeliveryProcess');
533 const logInfoData
= {
538 this.logger
.debug(_scope
, 'called', logInfoData
);
540 const subscription
= await
this.db
.subscriptionGetById(dbCtx
, subscriptionId
);
542 this.logger
.error(_scope
, 'no such subscription', logInfoData
);
543 throw new Errors
.InternalInconsistencyError('no such subscription');
546 logInfoData
.callback
= subscription
.callback
;
548 const topic
= await
this.db
.topicGetContentById(dbCtx
, subscription
.topicId
);
550 this.logger
.error(_scope
, 'no such topic', logInfoData
);
551 throw new Errors
.InternalInconsistencyError('no such topic');
554 if (topic
.isDeleted
) {
555 // If a topic has been set deleted, it does not list us as a valid hub.
556 // Queue an unsubscription.
557 const verification
= {
558 topicId: subscription
.topicId
,
559 callback: subscription
.callback
,
560 mode: Enum
.Mode
.Denied
,
561 reason: 'Gone: topic no longer valid on this hub.',
562 isPublisherValidated: true,
566 await
this.db
.transaction(dbCtx
, async (txCtx
) => {
567 await
this.db
.verificationInsert(txCtx
, verification
);
568 await
this.db
.subscriptionDeliveryComplete(txCtx
, subscription
.callback
, subscription
.topicId
, topic
.contentUpdated
);
570 this.logger
.info(_scope
, 'update unsubscription for deleted topic', logInfoData
);
574 logInfoData
.contentLength
= topic
.content
.length
;
575 logInfoData
.contentHash
= topic
.contentHash
;
577 const updateConfig
= {
579 url: subscription
.callback
,
582 [Enum
.Header
.Link
]: `<${topic.url}>; rel="self"${this.linkHub}`,
583 [Enum
.Header
.ContentType
]: topic
.contentType
|| Enum
.ContentType
.TextPlain
,
584 ...(subscription
.secret
&& { [Enum
.Header
.XHubSignature
]: Communication
.signature(topic
.content
, subscription
.secret
, subscription
.signatureAlgorithm
) }),
586 responseType: 'text',
589 this.logger
.info(_scope
, 'update request', logInfoData
);
593 response
= await
this.got(updateConfig
);
595 this.logger
.error(_scope
, 'update request failed', { ...logInfoData
, error: e
});
596 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
599 logInfoData
.response
= common
.gotResponseLogData(response
);
600 this.logger
.debug(_scope
, 'update response', logInfoData
);
602 switch (common
.httpStatusCodeClass(response
.statusCode
)) {
604 // Fall out of switch on success.
608 this.logger
.info(_scope
, 'update remote server error', logInfoData
);
609 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
613 if (response
.statusCode
=== 410) { // GONE
614 this.logger
.info(_scope
, 'client declined further updates', logInfoData
);
615 await
this.db
.subscriptionDeliveryGone(dbCtx
, subscription
.callback
, subscription
.topicId
);
618 // All other 4xx falls through as failure
621 this.logger
.info(_scope
, 'update failed with non-2xx status code', logInfoData
);
622 await
this.db
.subscriptionDeliveryIncomplete(dbCtx
, subscription
.callback
, subscription
.topicId
, this.options
.communication
.retryBackoffSeconds
);
626 await
this.db
.subscriptionDeliveryComplete(dbCtx
, subscription
.callback
, subscription
.topicId
, topic
.contentUpdated
);
627 this.logger
.info(_scope
, 'update success', logInfoData
);
632 * Claim and work a specific topic fetch task.
633 * @param {*} dbCtx db context
634 * @param {string} topicId topic id
635 * @param {string} requestId request id
636 * @returns {Promise<void>}
638 async
topicFetchClaimAndProcessById(dbCtx
, topicId
, requestId
) {
639 const _scope
= _fileScope('topicFetchClaimAndProcessById');
641 const claimResult
= await
this.db
.topicFetchClaimById(dbCtx
, topicId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
642 if (claimResult
.changes
!= 1) {
643 this.logger
.debug(_scope
, 'did not claim topic fetch', { topicId
, requestId
});
646 await
this.topicFetchProcess(dbCtx
, topicId
, requestId
);
651 * Claim and work a specific verification confirmation task.
652 * @param {*} dbCtx db context
653 * @param {*} verificationId verification id
654 * @param {string} requestId request id
655 * @returns {Promise<boolean>} whether to subsequently attempt next task if verification succeeds
657 async
verificationClaimAndProcessById(dbCtx
, verificationId
, requestId
) {
658 const _scope
= _fileScope('verificationClaimAndProcessById');
660 const claimResult
= await
this.db
.verificationClaimById(dbCtx
, verificationId
, this.options
.communication
.claimTimeoutSeconds
, this.options
.nodeId
);
661 if (claimResult
.changes
!= 1) {
662 this.logger
.debug(_scope
, 'did not claim verification', { verificationId
, requestId
});
665 await
this.verificationProcess(dbCtx
, verificationId
, requestId
);
671 * @param {*} dbCtx db context
672 * @param {number} wanted maximum tasks to claim
673 * @returns {Promise<void>[]} array of promises processing work
675 async
workFeed(dbCtx
, wanted
) {
676 const _scope
= _fileScope('workFeed');
677 const inProgress
= [];
678 const requestId
= common
.requestId();
679 const claimTimeoutSeconds
= this.options
.communication
.claimTimeoutSeconds
;
680 const nodeId
= this.options
.nodeId
;
681 let topicFetchPromises
= [], verificationPromises
= [], updatePromises
= [];
683 this.logger
.debug(_scope
, 'called', { wanted
});
687 // Update topics before anything else.
688 const topicFetchIds
= await
this.db
.topicFetchClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
689 topicFetchPromises
= topicFetchIds
.map((id
) => this.db
.context((ctx
) => this.topicFetchProcess(ctx
, id
, requestId
)));
690 inProgress
.push(...topicFetchPromises
);
691 wanted
-= topicFetchPromises
.length
;
695 // Then any pending verifications.
696 const verifications
= await
this.db
.verificationClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
697 verificationPromises
= verifications
.map((id
) => this.db
.context((ctx
) => this.verificationProcess(ctx
, id
, requestId
)));
698 inProgress
.push(...verificationPromises
);
699 wanted
-= verificationPromises
.length
;
703 // Finally dole out content.
704 const updates
= await
this.db
.subscriptionDeliveryClaim(dbCtx
, wanted
, claimTimeoutSeconds
, nodeId
);
705 updatePromises
= updates
.map((id
) => this.db
.context((ctx
) => this.subscriptionDeliveryProcess(ctx
, id
, requestId
)));
706 inProgress
.push(...updatePromises
);
707 wanted
-= updatePromises
.length
;
710 this.logger
.error(_scope
, 'failed', { error: e
});
711 // do not re-throw, return what we've claimed so far
713 this.logger
.debug(_scope
, 'searched for work', { topics: topicFetchPromises
.length
, verifications: verificationPromises
.length
, updates: updatePromises
.length
, wantedRemaining: wanted
, requestId
});
721 module
.exports
= Communication
;