update dependencies, devDependencies, copyright
[websub-hub] / src / communication.js
1 'use strict';
2
3 /**
4 * Here we wrangle all outgoing requests, as well as the
5 * worker which initiates most of them.
6 */
7
8 const common = require('./common');
9 const crypto = require('crypto');
10 const Enum = require('./enum');
11 const Errors = require('./errors');
12 const Worker = require('./worker');
13 const LinkHelper = require('./link-helper');
14 const { version: packageVersion, name: packageName } = require('../package.json'); // For default UA string
15
16 const _fileScope = common.fileScope(__filename);
17
18 class Communication {
19 constructor(logger, db, options) {
20 this.logger = logger;
21 this.db = db;
22 this.options = options;
23 this.linkHelper = new LinkHelper(logger, options);
24
25 if (this.options.dingus.selfBaseUrl) {
26 this.linkHub = `, <${this.options.dingus.selfBaseUrl}>; rel="hub"`;
27 } else {
28 this.linkHub = '';
29 this.logger.error(_fileScope('constructor'), 'empty dingus.selfBaseUrl value, server responses will not be compliant');
30 }
31
32 this.Got = undefined; // Will become the async imported got.
33 this.got = this._init; // First invocation imports got and replaces this.
34
35 this.worker = new Worker(logger, db, this.workFeed.bind(this), options);
36 this.worker.start();
37 }
38
39
40 /**
41 * Do a little dance to cope with ESM dynamic import.
42 * @param {...any} args
43 * @returns {Promise<any>}
44 */
45 async _init(...args) {
46 if (!this.Got) {
47 // For some reason eslint is confused about import being supported here.
48 // eslint-disable-next-line
49 this.Got = await import('got');
50 this.got = this.Got.got.extend({
51 followRedirect: false, // Outgoing API calls should not encounter redirects
52 throwHttpErrors: false, // We will be checking status codes explicitly
53 headers: {
54 [Enum.Header.UserAgent]: Communication.userAgentString(this.options.userAgent),
55 },
56 timeout: {
57 request: this.options.communication.requestTimeoutMs || 120000,
58 },
59 hooks: {
60 beforeRetry: [
61 this._onRetry,
62 ],
63 },
64 });
65 }
66
67 /* istanbul ignore if */
68 if (args.length) {
69 /* istanbul ignore next */
70 return this.got(...args);
71 }
72 }
73
74
75 /**
76 * Take note of transient retries.
77 * @param {*} error
78 * @param {*} retryCount
79 */
80 _onRetry(error, retryCount) {
81 const _scope = _fileScope('_onRetry');
82 this.logger.debug(_scope, 'retry', { retryCount, error });
83 }
84
85
86 /**
87 * Construct a user-agent value.
88 * @param {Object} userAgentConfig
89 * @param {String=} userAgentConfig.product
90 * @param {String=} userAgentConfig.version
91 * @param {String=} userAgentConfig.implementation
92 * @returns {String}
93 */
94 static userAgentString(userAgentConfig) {
95 // eslint-disable-next-line security/detect-object-injection
96 const _conf = (field, def) => (userAgentConfig && field in userAgentConfig) ? userAgentConfig[field] : def;
97 const product = _conf('product', packageName).split('/').pop();
98 const version = _conf('version', packageVersion);
99 let implementation = _conf('implementation', Enum.Specification);
100 if (implementation) {
101 implementation = ` (${implementation})`;
102 }
103 return `${product}/${version}${implementation}`;
104 }
105
106
107 /**
108 * Generate a random string.
109 * @param {Integer} bytes
110 * @returns {Promise<String>}
111 */
112 static async generateChallenge(bytes = 30) {
113 return (await common.randomBytesAsync(bytes)).toString('base64');
114 }
115
116
117 /**
118 * Generate the signature string for content.
119 * @param {Buffer} message
120 * @param {Buffer} secret
121 * @param {String} algorithm
122 * @returns {String}
123 */
124 static signature(message, secret, algorithm) {
125 const hmac = crypto.createHmac(algorithm, secret)
126 .update(message)
127 .digest('hex');
128 return `${algorithm}=${hmac}`;
129 }
130
131
132 /**
133 * Generate the hash for content.
134 * @param {Buffer} content
135 * @param {String} algorithm
136 * @returns {String}
137 */
138 static contentHash(content, algorithm) {
139 return crypto.createHash(algorithm)
140 .update(content)
141 .digest('hex');
142 }
143
144
145 /**
146 * Attempt to verify a requested intent with client callback endpoint.
147 * @param {*} dbCtx
148 * @param {*} verificationId
149 * @param {String} requestId
150 * @returns {Boolean} whether to subsequently attempt next task if verification succeeds
151 */
152 async verificationProcess(dbCtx, verificationId, requestId) {
153 const _scope = _fileScope('verificationProcess');
154
155 const verification = await this.db.verificationGetById(dbCtx, verificationId);
156 if (!verification) {
157 this.logger.error(_scope, 'no such verification', { verificationId, requestId });
158 throw new Errors.InternalInconsistencyError('no such verification id');
159 }
160
161 const topic = await this.db.topicGetById(dbCtx, verification.topicId);
162 if (!topic) {
163 this.logger.error(_scope, Enum.Message.NoSuchTopicId, { verification, requestId });
164 throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId);
165 }
166
167 if (!topic.isActive) {
168 // These should be filtered out when selecting verification tasks to process.
169 this.logger.debug(_scope, 'topic not active, skipping verification', { verification, requestId });
170 await this.db.verificationRelease(dbCtx, verificationId);
171 return;
172 }
173
174 // If topic is deleted, deny any subscriptions.
175 // Un-subscriptions can continue to be verified.
176 if (topic.isDeleted && verification.mode === Enum.Mode.Subscribe) {
177 this.logger.info(_scope, 'topic is deleted, verification becomes denial', { verification, requestId });
178
179 verification.mode = Enum.Mode.Denied;
180 verification.reason = 'Gone: topic no longer valid on this hub.';
181 verification.isPublisherValidated = true;
182 await this.db.verificationUpdate(dbCtx, verification);
183 }
184
185 // If verification needs publisher validation, this delivery is for publisher.
186 if (verification.mode === Enum.Mode.Subscribe && verification.isPublisherValidated === false) {
187 this.logger.debug(_scope, 'attempting publisher validation', { verification, requestId });
188 const continueVerification = await this.publisherValidate(dbCtx, topic, verification);
189
190 // If publisher validation completed, verification will proceed.
191 // If not, we're done for now and shall try again later.
192 if (!continueVerification) {
193 this.logger.debug(_scope, 'publisher validation did not complete, belaying verification', { verification });
194 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
195 return;
196 }
197 }
198
199 const callbackRequestConfig = {
200 method: 'GET',
201 url: new URL(verification.callback),
202 responseType: 'text',
203 };
204 const callbackParams = {
205 'hub.topic': topic.url,
206 'hub.mode': verification.mode,
207 };
208
209 let challenge;
210 if (verification.mode === Enum.Mode.Denied) {
211 // Denials don't have a challenge, but might have a reason.
212 if (verification.reason) {
213 callbackParams['hub.reason'] = verification.reason;
214 }
215 } else {
216 // Subscriptions and unsubscriptions require challenge matching.
217 challenge = await Communication.generateChallenge();
218 Object.assign(callbackParams, {
219 'hub.challenge': challenge,
220 // Explicitly convert leaseSeconds to string, due to some DB backends. (Looking at you, sqlite..)
221 'hub.lease_seconds': verification.leaseSeconds.toString(),
222 });
223 }
224 Object.entries(callbackParams)
225 .forEach(([k, v]) => callbackRequestConfig.url.searchParams.set(k, v))
226 ;
227
228 const logInfoData = {
229 callbackUrl: callbackRequestConfig.url.href,
230 topicUrl: topic.url,
231 mode: verification.mode,
232 originalRequestId: verification.requestId,
233 requestId,
234 verificationId,
235 };
236
237 this.logger.info(_scope, 'verification request', logInfoData);
238
239 let response;
240 try {
241 response = await this.got(callbackRequestConfig);
242 } catch (e) {
243 this.logger.error(_scope, 'verification request failed', { ...logInfoData, error: e });
244 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
245 return;
246 }
247 logInfoData.response = common.gotResponseLogData(response);
248 this.logger.debug(_scope, 'verification response', logInfoData );
249
250 let verificationAccepted = true; // Presume success.
251
252 switch (common.httpStatusCodeClass(response.statusCode)) {
253 case 2:
254 // Success, fall out of switch.
255 break;
256
257 case 5:
258 // Retry
259 this.logger.info(_scope, 'verification remote server error', logInfoData );
260 await this.db.verificationIncomplete(dbCtx, verificationId, this.options.communication.retryBackoffSeconds);
261 return;
262
263 default:
264 // Anything else is unsuccessful.
265 this.logger.info(_scope, 'verification rejected by status', logInfoData );
266 verificationAccepted = false;
267 }
268
269 // Any denial is not accepted.
270 if (verification.mode === Enum.Mode.Denied) {
271 this.logger.info(_scope, 'verification denial accepted', logInfoData );
272 verificationAccepted = false;
273 }
274
275 if ([Enum.Mode.Subscribe, Enum.Mode.Unsubscribe].includes(verification.mode)
276 && response.body !== challenge) {
277 this.logger.info(_scope, 'verification rejected by challenge', logInfoData);
278 verificationAccepted = false;
279 }
280
281 await this.db.transaction(dbCtx, async (txCtx) => {
282 switch (verification.mode) {
283 case Enum.Mode.Subscribe:
284 if (verificationAccepted) {
285 await this.db.subscriptionUpsert(txCtx, verification);
286 }
287 break;
288
289 case Enum.Mode.Unsubscribe:
290 if (verificationAccepted) {
291 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
292 if (topic.isDeleted) {
293 // Remove a deleted topic after the last subscription is notified.
294 await this.db.topicPendingDelete(txCtx, topic.id);
295 }
296 }
297 break;
298
299 case Enum.Mode.Denied:
300 await this.db.subscriptionDelete(txCtx, verification.callback, verification.topicId);
301 if (topic.isDeleted) {
302 // Remove a deleted topic after the last subscription is notified.
303 await this.db.topicPendingDelete(txCtx, topic.id);
304 }
305 break;
306
307 default:
308 this.logger.error(_scope, 'unanticipated mode', { logInfoData });
309 throw new Errors.InternalInconsistencyError(verification.mode);
310 }
311
312 await this.db.verificationComplete(txCtx, verificationId, verification.callback, verification.topicId);
313 }); // txCtx
314
315 this.logger.info(_scope, 'verification complete', { ...logInfoData, verificationAccepted });
316 }
317
318
319 /**
320 * Attempt to verify a pending subscription request with publisher.
321 * Updates (and persists) verification.
322 * Returns boolean of status of publisher contact, and hence
323 * whether to continue verification with client.
324 *
325 * This is not defined by the spec. We opt to speak JSON here.
326 * @param {*} dbCtx
327 * @param {TopicData} topic
328 * @param {VerificationData} verification
329 * @returns {Boolean}
330 */
331 async publisherValidate(dbCtx, topic, verification) {
332 const _scope = _fileScope('publisherValidate');
333 const logInfoData = {
334 topicUrl: topic.url,
335 callbackUrl: verification.callback,
336 requestId: verification.requestId,
337 };
338 let response;
339
340 this.logger.info(_scope, 'publisher validation request', logInfoData);
341
342 const publisherValidationRequestConfig = {
343 method: 'POST',
344 url: topic.publisherValidationUrl,
345 json: {
346 callback: verification.callback,
347 topic: topic.url,
348 ...(verification.httpFrom && { from: verification.httpFrom }),
349 ...(verification.httpRemoteAddr && { address: verification.httpRemoteAddr }),
350 },
351 responseType: 'json',
352 };
353 try {
354 response = await this.got(publisherValidationRequestConfig);
355 } catch (e) {
356 this.logger.error(_scope, 'publisher validation failed', { ...logInfoData, error: e });
357 return false; // Do not continue with client verification.
358 }
359
360 logInfoData.response = common.gotResponseLogData(response);
361 this.logger.debug(_scope, 'validation response', logInfoData);
362
363 let verificationNeedsUpdate = false;
364 switch (common.httpStatusCodeClass(response.statusCode)) {
365 case 2:
366 this.logger.info(_scope, 'publisher validation complete, allowed', logInfoData);
367 break;
368
369 case 5:
370 this.logger.info(_scope, 'publisher validation remote server error', logInfoData);
371 return false; // Do not continue with client verification.
372
373 default:
374 this.logger.info(_scope, 'publisher validation complete, denied', logInfoData);
375 // Change client verification
376 verification.mode = Enum.Mode.Denied;
377 verification.reason = 'publisher rejected request'; // TODO: details from response?
378 verificationNeedsUpdate = true;
379 }
380
381 // Success from publisher, either accepted or denied.
382 // Set validated flag, and allow client verification to continue.
383 await this.db.transaction(dbCtx, async (txCtx) => {
384 if (verificationNeedsUpdate) {
385 await this.db.verificationUpdate(txCtx, verification.id, verification);
386 }
387 await this.db.verificationValidated(txCtx, verification.id);
388 });
389 return true;
390 }
391
392
393 /**
394 * Retrieve content from a topic.
395 * @param {*} dbCtx
396 * @param {*} topicId
397 * @param {String} requestId
398 * @returns
399 */
400 async topicFetchProcess(dbCtx, topicId, requestId) {
401 const _scope = _fileScope('topicFetchProcess');
402 const logInfoData = {
403 topicId,
404 requestId,
405 };
406
407 this.logger.debug(_scope, 'called', logInfoData);
408
409 const topic = await this.db.topicGetById(dbCtx, topicId);
410 if (topic === undefined) {
411 this.logger.error(_scope, Enum.Message.NoSuchTopicId, logInfoData);
412 throw new Errors.InternalInconsistencyError(Enum.Message.NoSuchTopicId);
413 }
414
415 // Cull any expired subscriptions
416 await this.db.subscriptionDeleteExpired(dbCtx, topicId);
417
418 logInfoData.url = topic.url;
419
420 if (topic.isDeleted) {
421 this.logger.debug(_scope, 'topic deleted, skipping update request', logInfoData);
422 return;
423 }
424
425 const updateRequestConfig = {
426 followRedirect: true,
427 method: 'GET',
428 url: topic.url,
429 headers: {
430 [Enum.Header.Accept]: [topic.contentType, `*/*${topic.contentType ? ';q=0.9' : ''}`].filter((x) => x).join(', '),
431 ...(topic.httpEtag && { [Enum.Header.IfNoneMatch]: topic.httpEtag }),
432 ...(topic.httpLastModified && { [Enum.Header.IfModifiedSince]: topic.httpLastModified }),
433 },
434 responseType: 'buffer',
435 };
436
437 this.logger.info(_scope, 'topic update request', logInfoData);
438
439 let response;
440 try {
441 response = await this.got(updateRequestConfig);
442 } catch (e) {
443 this.logger.error(_scope, 'update request failed', logInfoData);
444 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
445 return;
446 }
447 logInfoData.response = common.gotResponseLogData(response);
448 this.logger.debug(_scope, 'fetch response', logInfoData);
449
450 switch (common.httpStatusCodeClass(response.statusCode)) {
451 case 2:
452 case 3:
453 // Fall out of switch on success
454 break;
455
456 case 5:
457 this.logger.info(_scope, 'update remote server error', logInfoData);
458 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
459 return;
460
461 default:
462 this.logger.info(_scope, 'fetch failed by status', logInfoData);
463 await this.db.topicFetchIncomplete(dbCtx, topicId, this.options.communication.retryBackoffSeconds);
464 return;
465 }
466
467 if (response.statusCode === 304) {
468 this.logger.info(_scope, 'content has not changed, per server', logInfoData);
469 await this.db.topicFetchComplete(dbCtx, topicId);
470 return;
471 }
472
473 const contentHash = Communication.contentHash(response.body, topic.contentHashAlgorithm);
474 logInfoData.contentHash = contentHash;
475 if (topic.contentHash === contentHash) {
476 this.logger.info(_scope, 'content has not changed', logInfoData);
477 await this.db.topicFetchComplete(dbCtx, topicId);
478 return;
479 }
480
481 const validHub = await this.linkHelper.validHub(topic.url, response.headers, response.body);
482 if (!validHub) {
483 this.logger.info(_scope, 'retrieved topic does not list us as hub', { logInfoData });
484 if (this.options.communication.strictTopicHubLink) {
485 await this.db.transaction(dbCtx, async (txCtx) => {
486 // Set as deleted and set content_updated so subscriptions are notified.
487 await this.db.topicDeleted(txCtx, topicId);
488 await this.db.topicFetchComplete(txCtx, topicId);
489 });
490 // Attempt to remove from db, if no active subscriptions.
491 await this.db.topicPendingDelete(dbCtx, topicId);
492 return;
493 }
494 }
495
496 const contentType = response.headers[Enum.Header.ContentType.toLowerCase()];
497 const httpETag = response.headers[Enum.Header.ETag.toLowerCase()];
498 const httpLastModified = response.headers[Enum.Header.LastModified.toLowerCase()];
499
500 await this.db.transaction(dbCtx, async (txCtx) => {
501 await this.db.topicSetContent(txCtx, {
502 topicId,
503 content: Buffer.from(response.body),
504 contentHash,
505 ...(contentType && { contentType }),
506 ...(httpETag && { httpETag }),
507 ...(httpLastModified && { httpLastModified }),
508 });
509
510 await this.db.topicFetchComplete(txCtx, topicId);
511 });
512 this.logger.info(_scope, 'content updated', logInfoData);
513 }
514
515
516 /**
517 * Attempt to deliver a topic's content to a subscription.
518 * @param {*} dbCtx
519 * @param {String} callback
520 * @param {*} topicId
521 * @param {String} requestId
522 */
523 async subscriptionDeliveryProcess(dbCtx, subscriptionId, requestId) {
524 const _scope = _fileScope('subscriptionDeliveryProcess');
525
526 const logInfoData = {
527 subscriptionId,
528 requestId,
529 };
530
531 this.logger.debug(_scope, 'called', logInfoData);
532
533 const subscription = await this.db.subscriptionGetById(dbCtx, subscriptionId);
534 if (!subscription) {
535 this.logger.error(_scope, 'no such subscription', logInfoData);
536 throw new Errors.InternalInconsistencyError('no such subscription');
537 }
538
539 logInfoData.callback = subscription.callback;
540
541 const topic = await this.db.topicGetContentById(dbCtx, subscription.topicId);
542 if (!topic) {
543 this.logger.error(_scope, 'no such topic', logInfoData);
544 throw new Errors.InternalInconsistencyError('no such topic');
545 }
546
547 if (topic.isDeleted) {
548 // If a topic has been set deleted, it does not list us as a valid hub.
549 // Queue an unsubscription.
550 const verification = {
551 topicId: subscription.topicId,
552 callback: subscription.callback,
553 mode: Enum.Mode.Denied,
554 reason: 'Gone: topic no longer valid on this hub.',
555 isPublisherValidated: true,
556 requestId,
557 };
558
559 await this.db.transaction(dbCtx, async (txCtx) => {
560 await this.db.verificationInsert(txCtx, verification);
561 await this.db.subscriptionDeliveryComplete(txCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
562 });
563 this.logger.info(_scope, 'update unsubscription for deleted topic', logInfoData);
564 return;
565 }
566
567 logInfoData.contentLength = topic.content.length;
568 logInfoData.contentHash = topic.contentHash;
569
570 const updateConfig = {
571 method: 'POST',
572 url: subscription.callback,
573 body: topic.content,
574 headers: {
575 [Enum.Header.Link]: `<${topic.url}>; rel="self"${this.linkHub}`,
576 [Enum.Header.ContentType]: topic.contentType || Enum.ContentType.TextPlain,
577 ...(subscription.secret && { [Enum.Header.XHubSignature]: Communication.signature(topic.content, subscription.secret, subscription.signatureAlgorithm) }),
578 },
579 responseType: 'text',
580 };
581
582 this.logger.info(_scope, 'update request', logInfoData);
583
584 let response;
585 try {
586 response = await this.got(updateConfig);
587 } catch (e) {
588 this.logger.error(_scope, 'update request failed', { ...logInfoData, error: e });
589 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
590 return;
591 }
592 logInfoData.response = common.gotResponseLogData(response);
593 this.logger.debug(_scope, 'update response', logInfoData);
594
595 switch (common.httpStatusCodeClass(response.statusCode)) {
596 case 2:
597 // Fall out of switch on success.
598 break;
599
600 case 5:
601 this.logger.info(_scope, 'update remote server error', logInfoData);
602 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
603 return;
604
605 case 4:
606 if (response.statusCode === 410) { // GONE
607 this.logger.info(_scope, 'client declined further updates', logInfoData);
608 await this.db.subscriptionDeliveryGone(dbCtx, subscription.callback, subscription.topicId);
609 return;
610 }
611 // All other 4xx falls through as failure
612
613 default:
614 this.logger.info(_scope, 'update failed with non-2xx status code', logInfoData);
615 await this.db.subscriptionDeliveryIncomplete(dbCtx, subscription.callback, subscription.topicId, this.options.communication.retryBackoffSeconds);
616 return;
617 }
618
619 await this.db.subscriptionDeliveryComplete(dbCtx, subscription.callback, subscription.topicId, topic.contentUpdated);
620 this.logger.info(_scope, 'update success', logInfoData);
621 }
622
623
624 /**
625 * Claim and work a specific topic fetch task.
626 * @param {*} dbCtx
627 * @param {*} id
628 * @param {String} requestId
629 */
630 async topicFetchClaimAndProcessById(dbCtx, topicId, requestId) {
631 const _scope = _fileScope('topicFetchClaimAndProcessById');
632
633 const claimResult = await this.db.topicFetchClaimById(dbCtx, topicId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
634 if (claimResult.changes != 1) {
635 this.logger.debug(_scope, 'did not claim topic fetch', { topicId, requestId });
636 return;
637 }
638 await this.topicFetchProcess(dbCtx, topicId, requestId);
639 }
640
641
642 /**
643 * Claim and work a specific verification confirmation task.
644 * @param {*} dbCtx
645 * @param {*} verificationId
646 * @param {String} requestId
647 * @returns
648 */
649 async verificationClaimAndProcessById(dbCtx, verificationId, requestId) {
650 const _scope = _fileScope('verificationClaimAndProcessById');
651
652 const claimResult = await this.db.verificationClaimById(dbCtx, verificationId, this.options.communication.claimTimeoutSeconds, this.options.nodeId);
653 if (claimResult.changes != 1) {
654 this.logger.debug(_scope, 'did not claim verification', { verificationId, requestId });
655 return;
656 }
657 await this.verificationProcess(dbCtx, verificationId, requestId);
658 }
659
660
661 /**
662 *
663 * @param {*} dbCtx
664 * @param {Number} wanted maximum tasks to claim
665 * @returns {Promise<void>[]}
666 */
667 async workFeed(dbCtx, wanted) {
668 const _scope = _fileScope('workFeed');
669 const inProgress = [];
670 const requestId = common.requestId();
671 const claimTimeoutSeconds = this.options.communication.claimTimeoutSeconds;
672 const nodeId = this.options.nodeId;
673 let topicFetchPromises = [], verificationPromises = [], updatePromises = [];
674
675 this.logger.debug(_scope, 'called', { wanted });
676
677 try {
678 if (wanted > 0) {
679 // Update topics before anything else.
680 const topicFetchIds = await this.db.topicFetchClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
681 topicFetchPromises = topicFetchIds.map((id) => this.db.context((ctx) => this.topicFetchProcess(ctx, id, requestId)));
682 inProgress.push(...topicFetchPromises);
683 wanted -= topicFetchPromises.length;
684 }
685
686 if (wanted > 0) {
687 // Then any pending verifications.
688 const verifications = await this.db.verificationClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
689 verificationPromises = verifications.map((id) => this.db.context((ctx) => this.verificationProcess(ctx, id, requestId)));
690 inProgress.push(...verificationPromises);
691 wanted -= verificationPromises.length;
692 }
693
694 if (wanted > 0) {
695 // Finally dole out content.
696 const updates = await this.db.subscriptionDeliveryClaim(dbCtx, wanted, claimTimeoutSeconds, nodeId);
697 updatePromises = updates.map((id) => this.db.context((ctx) => this.subscriptionDeliveryProcess(ctx, id, requestId)));
698 inProgress.push(...updatePromises);
699 wanted -= updatePromises.length;
700 }
701 } catch (e) {
702 this.logger.error(_scope, 'failed', { error: e });
703 // do not re-throw, return what we've claimed so far
704 }
705 this.logger.debug(_scope, 'searched for work', { topics: topicFetchPromises.length, verifications: verificationPromises.length, updates: updatePromises.length, wantedRemaining: wanted, requestId });
706
707 return inProgress;
708 }
709
710
711 }
712
713 module.exports = Communication;