Initial release
[websub-hub] / src / communication.js
1 'use strict';
2
3 /**
4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
6 */
7
8 const axios = require('axios');
9 const common = require('./common');
10 const crypto = require('crypto');
11 const Enum = require('./enum');
12 const Errors = require('./errors');
13 const Worker = require('./worker');
14 const LinkHelper = require('./link-helper');
15 const { version: packageVersion, name: packageName } = require('../package.json'); // For default UA string
16
17 const { performance } = require('perf_hooks');
18
19 const _fileScope = common.fileScope(__filename);
20
21 class Communication {
22 constructor(logger, db, options) {
23 this.logger = logger;
24 this.db = db;
25 this.options = options;
26 this.linkHelper = new LinkHelper(logger, options);
27
28 if (this.options.dingus.selfBaseUrl) {
29 this.linkHub = `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
30 } else {
31 this.linkHub = '';
32 this.logger.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
33 }
34
35 // Set common options
36 this.axios = axios.create({
37 validateStatus: null, // Non-success responses are not exceptional
38 headers: {
39 [Enum.Header.UserAgent]: Communication.userAgentString(options.userAgent),
40 },
41 });
42
43 this.axios.interceptors.request.use((request) => {
44 request.startTimestampMs = performance.now();
45 return request;
46 });
47 this.axios.interceptors.response.use((response) => {
48 response.elapsedTimeMs = performance.now() - response.config.startTimestampMs;
49 return response;
50 });
51
52 this.worker = new Worker(logger, this.workFeed.bind(this), options);
53 this.worker.start();
54 }
55
56
57 static userAgentString(userAgentConfig) {
58 // eslint-disable-next-line security/detect-object-injection
59 const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def;
60 const product = _conf('product', packageName);
61 const version = _conf('version', packageVersion);
62 let implementation = _conf('implementation', Enum.Specification);
63 if (implementation) {
64 implementation = ` (${implementation})`;
65 }
66 return `${product}/${version}${implementation}`;
67 }
68
69
70 /**
71 * Generate a random string.
72 * @param {Integer} bytes
73 * @returns {String}
74 */
75 static async generateChallenge(bytes = 30) {
76 return (await common.randomBytesAsync(bytes)).toString('base64');
77 }
78
79
80 /**
81 * Generate the signature string for content.
82 * @param {Buffer} message
83 * @param {Buffer} secret
84 * @param {String} algorithm
85 * @returns {String}
86 */
87 static signature(message, secret, algorithm) {
88 const hmac = crypto.createHmac(algorithm, secret);
89 hmac.update(message);
90 return `${algorithm}=${hmac.digest('hex')}`;
91 }
92
93
94 /**
95 * Generate the hash for content.
96 * @param {Buffer} content
97 * @param {String} algorithm
98 * @returns
99 */
100 static contentHash(content, algorithm) {
101 const hash = crypto.createHash(algorithm);
102 hash.update(content);
103 return hash.digest('hex');
104 }
105
106
107 /**
108 * A request skeleton config.
109 * @param {String} method
110 * @param {String} requestUrl
111 * @param {String} body
112 * @param {Object} params
113 */
114 static _axiosConfig(method, requestUrl, body, params = {}, headers = {}) {
115 const urlObj = new URL(requestUrl);
116 const config = {
117 method,
118 url: `${urlObj.origin}${urlObj.pathname}`,
119 params: urlObj.searchParams,
120 headers,
121 ...(body && { data: body }),
122 // Setting this does not appear to be enough to keep axios from parsing JSON response into object
123 responseType: 'text',
124 // So force the matter by eliding all response transformations
125 transformResponse: [ (res) => res ],
126 };
127 Object.entries(params).map(([k, v]) => config.params.set(k, v));
128 return config;
129 }
130
131
132 /**
133 * Create request config for verifying an intent.
134 * @param {URL} requestUrl
135 * @param {String} topicUrl
136 * @param {String} mode
137 * @param {Integer} leaseSeconds
138 * @param {String} challenge
139 */
140 static _intentVerifyAxiosConfig(requestUrl, topicUrl, mode, leaseSeconds, challenge) {
141 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
142 leaseSeconds = leaseSeconds.toString();
143
144 return Communication._axiosConfig('GET', requestUrl, undefined, {
145 'hub.mode': mode,
146 'hub.topic': topicUrl,
147 'hub.challenge': challenge,
148 'hub.lease_seconds': leaseSeconds,
149 }, {});
150 }
151
152
153 /**
154 * Create request config for denying an intent.
155 * @param {String} requestUrl
156 * @param {String} topicUrl
157 * @param {String} reason
158 * @returns {String}
159 */
160 static _intentDenyAxiosConfig(requestUrl, topicUrl, reason) {
161 return Communication._axiosConfig('GET', requestUrl, undefined, {
162 'hub.mode': Enum.Mode.Denied,
163 'hub.topic': topicUrl,
164 ...(reason && { 'hub.reason': reason }),
165 }, {});
166 }
167
168
169 /**
170 * Create request config for querying publisher for subscription validation.
171 * @param {Topic} topic
172 * @param {Verification} verification
173 * @returns {String}
174 */
175 static _publisherValidationAxiosConfig(topic, verification) {
176 const body = {
177 callback: verification.callback,
178 topic: topic.url,
179 ...(verification.httpFrom && { from: verification.httpFrom }),
180 ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }),
181 };
182 return Communication._axiosConfig('POST', topic.publisherValidationUrl, body, {}, {
183 [Enum.Header.ContentType]: Enum.ContentType.ApplicationJson,
184 });
185 }
186
187
188 /**
189 * Create request config for fetching topic content.
190 * Prefer existing content-type, but accept anything.
191 * @param {Topic} topic
192 * @returns {String}
193 */
194 static _topicFetchAxiosConfig(topic) {
195 const acceptWildcard = '*/*' + (topic.contentType ? ';q=0.9' : '');
196 const acceptPreferred = [topic.contentType, acceptWildcard].filter((x) => x).join(', ');
197 return Communication._axiosConfig('GET', topic.url, undefined, {}, {
198 [Enum.Header.Accept]: acceptPreferred,
199 });
200 }
201
202
203 /**
204 * Attempt to verify a requested intent with client callback endpoint.
205 * @param {*} dbCtx
206 * @param {*} verificationId
207 * @param {String} requestId
208 * @returns {Boolean} whether to subsequently attempt next task if verification succeeds
209 */
210 async verificationProcess(dbCtx, verificationId, requestId) {
211 const _scope = _fileScope('verificationProcess');
212
213 const verification = await this.db.verificationGetById(dbCtx, verificationId);
214 if (!verification) {
215 this.logger.error(_scope, 'no such verification', { verificationId, requestId });
216 throw new Errors.InternalInconsistencyError('no such verification id');
217 }
218
219 const topic = await this.db.topicGetById(dbCtx, verification.topicId);
220 if (!topic) {
221 this.logger.error(_scope, 'no such topic id', { verification, requestId });
222 throw new Errors.InternalInconsistencyError('no such topic id');
223 }
224
225 if (!topic.isActive) {
226 this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
227 await this.db.verificationRelease(dbCtx, verificationId);
228 return;
229 }
230
231 // If topic is deleted, deny any subscriptions.
232 // Un-subscriptions can continue to be verified.
233 if (topic.isDeleted && verification.mode === Enum.Mode.Subscribe) {
234 this.logger.info(_scope, 'topic is deleted, verification becomes denial', { verification, requestId });
235
236 verification.mode = Enum.Mode.Denied;
237 verification.reason = 'Gone: topic no longer valid on this hub.';
238 verification.isPublisherValidated = true;
239 await this.db.verificationUpdate(dbCtx, verification);
240 }
241
242 // If verification needs publisher validation, this delivery is for publisher.
243 if (verification.mode === Enum.Mode.Subscribe && verification.isPublisherValidated === false) {
244 this.logger.debug(_scope, 'attempting publisher validation', { verification, requestId });
245 const continueVerification = await this.publisherValidate(dbCtx, topic, verification);
246
247 // If publisher validation completed, verification will proceed.
248 // If not, we're done for now and shall try again later.
249 if (!continueVerification) {
250 this.logger.debug(_scope, 'publisher validation did not complete, belaying verification', { verification });
251 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
252 return;
253 }
254 }
255
256 const u = new URL(verification.callback);
257 let callbackRequestConfig, challenge;
258 if (verification.mode === Enum.Mode.Denied) {
259 // Denials don't have a challenge.
260 callbackRequestConfig = Communication._intentDenyAxiosConfig(u, topic.url, verification.reason);
261 } else {
262 // Subscriptions and unsubscriptions require challenge matching.
263 challenge = await Communication.generateChallenge();
264 callbackRequestConfig = Communication._intentVerifyAxiosConfig(u, topic.url, verification.mode, verification.leaseSeconds, challenge);
265 }
266
267 const logInfoData = {
268 callbackUrl: u.href,
269 topicUrl: topic.url,
270 mode: verification.mode,
271 originalRequestId: verification.requestId,
272 requestId,
273 verificationId,
274 };
275
276 this.logger.info(_scope, 'verification request', logInfoData);
277
278 let response;
279 try {
280 response = await this.axios(callbackRequestConfig);
281 } catch (e) {
282 this.logger.error(_scope, 'verification request failed', { ...logInfoData, error: e });
283 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
284 return;
285 }
286 logInfoData.response = common.axiosResponseLogData(response);
287 this.logger.debug(_scope, 'verification response', logInfoData );
288
289 let verificationAccepted = true; // Presume success.
290
291 switch (common.httpStatusCodeClass(response.status)) {
292 case 2:
293 // Success, fall out of switch.
294 break;
295
296 case 5:
297 // Retry
298 this.logger.info(_scope, 'verification remote server error', logInfoData );
299 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
300 return;
301
302 default:
303 // Anything else is unsuccessful.
304 this.logger.info(_scope, 'verification rejected by status', logInfoData );
305 verificationAccepted = false;
306 }
307
308 // Any denial is not accepted.
309 if (verification.mode === Enum.Mode.Denied) {
310 this.logger.info(_scope, 'verification denial accepted', logInfoData );
311 verificationAccepted = false;
312 }
313
314 if ([Enum.Mode.Subscribe, Enum.Mode.Unsubscribe].includes(verification.mode)
315 && response.data !== challenge) {
316 this.logger.info(_scope, 'verification rejected by challenge', logInfoData);
317 verificationAccepted = false;
318 }
319
320 await this.db.transaction(dbCtx, async (txCtx) => {
321 switch (verification.mode) {
322 case Enum.Mode.Subscribe:
323 if (verificationAccepted) {
324 await this.db.subscriptionUpsert(txCtx, verification);
325 }
326 break;
327
328 case Enum.Mode.Unsubscribe:
329 if (verificationAccepted) {
330 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
331 }
332 break;
333
334 case Enum.Mode.Denied:
335 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
336 break;
337
338 default:
339 this.logger.error(_scope, 'unanticipated mode', { logInfoData });
340 throw new Errors.InternalInconsistencyError(verification.mode);
341 }
342
343 await this.db.verificationComplete(dbCtx, verificationId, verification.callback, verification.topicId);
344 }); // txCtx
345
346 this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
347 }
348
349
350 /**
351 * Attempt to verify a pending subscription request with publisher.
352 * Updates (and persists) verification.
353 * Returns boolean of status of publisher contact, and hence
354 * whether to continue verification with client.
355 * @param {*} dbCtx
356 * @param {TopicData} topic
357 * @param {VerificationData} verification
358 * @returns {Boolean}
359 */
360 async publisherValidate(dbCtx, topic, verification) {
361 const _scope = _fileScope('publisherValidate');
362 const publisherValidationRequestConfig = Communication._publisherValidationAxiosConfig(topic, verification);
363 const logInfoData = {
364 topicUrl: topic.url,
365 callbackUrl: verification.callback,
366 requestId: verification.requestId,
367 };
368 let response;
369
370 this.logger.info(_scope, 'publisher validation request', logInfoData);
371
372 try {
373 response = await this.axios(publisherValidationRequestConfig);
374 } catch (e) {
375 this.logger.error(_scope, 'publisher validation failed', { ...logInfoData, error: e });
376 return false; // Do not continue with client verification.
377 }
378
379 logInfoData.response = common.axiosResponseLogData(response);
380 this.logger.debug(_scope, 'validation response', logInfoData);
381
382 let verificationNeedsUpdate = false;
383 switch (common.httpStatusCodeClass(response.status)) {
384 case 2:
385 this.logger.info(_scope, 'publisher validation complete, allowed', logInfoData);
386 break;
387
388 case 5:
389 this.logger.info(_scope, 'publisher validation remote server error', logInfoData);
390 return false; // Do not continue with client verification.
391
392 default:
393 this.logger.info(_scope, 'publisher validation complete, denied', logInfoData);
394 // Change client verification
395 verification.mode = Enum.Mode.Denied;
396 verification.reason = 'publisher rejected request'; // TODO: details from response?
397 verificationNeedsUpdate = true;
398 }
399
400 // Success from publisher, either accepted or denied.
401 // Set validated flag, and allow client verification to continue.
402 await this.db.transaction(dbCtx, async (txCtx) => {
403 if (verificationNeedsUpdate) {
404 await this.db.verificationUpdate(txCtx, verification.id, verification);
405 }
406 await this.db.verificationValidated(txCtx, verification.id);
407 });
408 return true;
409 }
410
411
412 /**
413 * Retrieve content from a topic.
414 * @param {*} dbCtx
415 * @param {*} topicId
416 * @param {String} requestId
417 * @returns
418 */
419 async topicFetchProcess(dbCtx, topicId, requestId) {
420 const _scope = _fileScope('topicFetchProcess');
421 const logInfoData = {
422 topicId,
423 requestId,
424 };
425
426 this.logger.debug(_scope, 'called', logInfoData);
427
428 const topic = await this.db.topicGetById(dbCtx, topicId);
429 if (topic === undefined) {
430 this.logger.error(_scope, 'no such topic id', logInfoData);
431 throw new Errors.InternalInconsistencyError('no such topic id');
432 }
433
434 logInfoData.url = topicId.url;
435
436 if (topic.isDeleted) {
437 this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData);
438 return;
439 }
440
441 const updateRequestConfig = Communication._topicFetchAxiosConfig(topic);
442
443 this.logger.info(_scope, 'topic update request', logInfoData);
444
445 let response;
446 try {
447 response = await this.axios(updateRequestConfig);
448 } catch (e) {
449 this.logger.error(_scope, 'update request failed', logInfoData);
450 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
451 return;
452 }
453 logInfoData.response = common.axiosResponseLogData(response);
454 this.logger.debug(_scope, 'fetch response', logInfoData);
455
456 switch (common.httpStatusCodeClass(response.status)) {
457 case 2:
458 // Fall out of switch on success
459 break;
460
461 case 5:
462 this.logger.info(_scope, 'update remote server error', logInfoData);
463 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
464 return;
465
466 default:
467 this.logger.info(_scope, 'fetch failed by status', logInfoData);
468 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
469 return;
470 }
471
472 const contentHash = Communication.contentHash(response.data, topic.contentHashAlgorithm);
473 logInfoData.contentHash = contentHash;
474 if (topic.contentHash === contentHash) {
475 this.logger.info(_scope, 'content has not changed', logInfoData);
476 await this.db.topicFetchComplete(dbCtx, topicId);
477 return;
478 }
479
480 const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.data);
481 if (!validHub) {
482 this.logger.debug(_scope, 'retrieved topic does not list us as hub', { logInfoData });
483 if (this.options.communication.strictTopicHubLink) {
484 await this.db.transaction(dbCtx, async (txCtx) => {
485 // Set as deleted and set content_updated so subscriptions are notified.
486 await this.db.topicDeleted(txCtx, topicId);
487 await this.db.topicFetchComplete(txCtx, topicId);
488 });
489 return;
490 }
491 }
492
493 const contentType = response.headers[Enum.Header.ContentType.toLowerCase()];
494
495 await this.db.transaction(dbCtx, async (txCtx) => {
496 await this.db.topicSetContent(txCtx, {
497 topicId,
498 content: Buffer.from(response.data),
499 contentHash,
500 ...(contentType && { contentType }),
501 });
502
503 await this.db.topicFetchComplete(txCtx, topicId);
504 });
505 this.logger.info(_scope, 'content updated', logInfoData);
506 }
507
508
509 /**
510 * Attempt to deliver a topic's content to a subscription.
511 * @param {*} dbCtx
512 * @param {String} callback
513 * @param {*} topicId
514 * @param {String} requestId
515 */
516 async subscriptionDeliveryProcess(dbCtx, subscriptionId, requestId) {
517 const _scope = _fileScope('subscriptionDeliveryProcess');
518
519 const logInfoData = {
520 subscriptionId,
521 requestId,
522 };
523
524 this.logger.debug(_scope, 'called', logInfoData);
525
526 const subscription = await this.db.subscriptionGetById(dbCtx, subscriptionId);
527 if (!subscription) {
528 this.logger.error(_scope, 'no such subscription', logInfoData);
529 throw new Errors.InternalInconsistencyError('no such subscription');
530 }
531
532 logInfoData.callback = subscription.callback;
533
534 const topic = await this.db.topicGetContentById(dbCtx, subscription.topicId);
535 if (!topic) {
536 this.logger.error(_scope, 'no such topic', logInfoData);
537 throw new Errors.InternalInconsistencyError('no such topic');
538 }
539
540 if (topic.isDeleted) {
541 // If a topic has been set deleted, it does not list us as a valid hub.
542 // Queue an unsubscription.
543 const verification = {
544 topicId: subscription.topicId,
545 callback: subscription.callback,
546 mode: Enum.Mode.Denied,
547 reason: 'Gone: topic no longer valid on this hub.',
548 isPublisherValidated: true,
549 requestId,
550 };
551
552 await this.db.transaction(dbCtx, async (txCtx) => {
553 await this.db.verificationInsert(txCtx, verification);
554 await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId);
555 });
556 this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
557 return;
558 }
559
560 logInfoData.contentLength = topic.content.length;
561 logInfoData.contentHash = topic.contentHash;
562
563 const updateAxiosConfig = Communication._axiosConfig('POST', subscription.callback, topic.content, {}, {
564 [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`,
565 [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain,
566 ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }),
567 });
568
569 this.logger.info(_scope, 'update request', logInfoData);
570
571 let response;
572 try {
573 response = await this.axios(updateAxiosConfig);
574 } catch (e) {
575 this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e });
576 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
577 return;
578 }
579 logInfoData.response = common.axiosResponseLogData(response);
580 this.logger.debug(_scope, 'update response', logInfoData);
581
582 switch (common.httpStatusCodeClass(response.status)) {
583 case 2:
584 // Fall out of switch on success.
585 break;
586
587 case 5:
588 this.logger.info(_scope, 'update remote server error', logInfoData);
589 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
590 return;
591
592 case 4:
593 if (response.status === 410) { // GONE
594 this.logger.info(_scope, 'client declined further updates', logInfoData);
595 await this.db.subscriptionDeliveryGone(dbCtx, subscription.callback, subscription.topicId);
596 return;
597 }
598 // All other 4xx falls through as failure
599
600 default:
601 this.logger.info(_scope, 'update failed with non-2xx status code', logInfoData);
602 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
603 return;
604 }
605
606 await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId);
607 this.logger.info(_scope, 'update success', logInfoData);
608 }
609
610
611 /**
612 * Claim and work a specific topic fetch task.
613 * @param {*} dbCtx
614 * @param {*} id
615 * @param {String} requestId
616 */
617 async topicFetchClaimAndProcessById(dbCtx, topicId, requestId) {
618 const _scope = _fileScope('topicFetchClaimAndProcessById');
619
620 const claimResult = await this.db.topicFetchClaimById(dbCtx, topicId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
621 if (claimResult.changes != 1) {
622 this.logger.debug(_scope, 'did not claim topic fetch', { topicId, requestId });
623 return;
624 }
625 await this.topicFetchProcess(dbCtx, topicId, requestId);
626 }
627
628
629 /**
630 * Claim and work a specific verification confirmation task.
631 * @param {*} dbCtx
632 * @param {*} verificationId
633 * @param {String} requestId
634 * @returns
635 */
636 async verificationClaimAndProcessById(dbCtx, verificationId, requestId) {
637 const _scope = _fileScope('verificationClaimAndProcessById');
638
639 const claimResult = await this.db.verificationClaimById(dbCtx, verificationId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
640 if (claimResult.changes != 1) {
641 this.logger.debug(_scope, 'did not claim verification', { verificationId, requestId });
642 return;
643 }
644 await this.verificationProcess(dbCtx, verificationId, requestId);
645 }
646
647
648 /**
649 *
650 * @param {Number} wanted maximum tasks to claim
651 * @returns {Promise<void>[]}
652 */
653 async workFeed(wanted) {
654 const _scope = _fileScope('workFeed');
655 const inProgress = [];
656 const requestId = common.requestId();
657 const claimTimeoutSeconds = this.options.communication.claimTimeoutSeconds;
658 const nodeId = this.options.nodeId;
659 let topicFetchPromises = [], verificationPromises = [], updatePromises = [];
660
661 this.logger.debug(_scope, 'called', { wanted });
662
663 try {
664 await this.db.context(async (dbCtx) => {
665 if (wanted > 0) {
666 // Update topics before anything else.
667 const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
668 // Each task gets a new context, as these map to connections in some dbs.
669 // This dbCtx goes away after launching the processing functions, so would not be available to tasks.
670 topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
671 inProgress.push(...topicFetchPromises);
672 wanted -= topicFetchPromises.length;
673 }
674
675 if (wanted > 0) {
676 // Then any pending verifications.
677 const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
678 verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
679 inProgress.push(...verificationPromises);
680 wanted -= verificationPromises.length;
681 }
682
683 if (wanted > 0) {
684 // Finally dole out content.
685 const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
686 updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
687 inProgress.push(...updatePromises);
688 wanted -= updatePromises.length;
689 }
690 }); // dbCtx
691 } catch (e) {
692 this.logger.error(_scope, 'failed', { error: e });
693 // do not re-throw, return what we've claimed so far
694 }
695 this.logger.debug(_scope, 'searched for work', { topics: topicFetchPromises.length, verifications: verificationPromises.length, updates: updatePromises.length, wantedRemaining: wanted, requestId });
696
697 return inProgress;
698 }
699
700
701 }
702
703 module.exports = Communication;